lib/tevent: Sync tevent from samba git tree
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* list of "ctdb ipreallocate" processes to call back when we have
34    finished the takeover run.
35 */
36 struct ip_reallocate_list {
37         struct ip_reallocate_list *next;
38         struct rd_memdump_reply *rd;
39 };
40
41 struct ctdb_banning_state {
42         uint32_t count;
43         struct timeval last_reported_time;
44 };
45
46 /*
47   private state of recovery daemon
48  */
49 struct ctdb_recoverd {
50         struct ctdb_context *ctdb;
51         uint32_t recmaster;
52         uint32_t num_active;
53         uint32_t num_connected;
54         uint32_t last_culprit_node;
55         struct ctdb_node_map *nodemap;
56         struct timeval priority_time;
57         bool need_takeover_run;
58         bool need_recovery;
59         uint32_t node_flags;
60         struct timed_event *send_election_te;
61         struct timed_event *election_timeout;
62         struct vacuum_info *vacuum_info;
63         TALLOC_CTX *ip_reallocate_ctx;
64         struct ip_reallocate_list *reallocate_callers;
65         TALLOC_CTX *ip_check_disable_ctx;
66         struct ctdb_control_get_ifaces *ifaces;
67         TALLOC_CTX *deferred_rebalance_ctx;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
74
75 /*
76   ban a node for a period of time
77  */
78 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
79 {
80         int ret;
81         struct ctdb_context *ctdb = rec->ctdb;
82         struct ctdb_ban_time bantime;
83        
84         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
85
86         if (!ctdb_validate_pnn(ctdb, pnn)) {
87                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
88                 return;
89         }
90
91         bantime.pnn  = pnn;
92         bantime.time = ban_time;
93
94         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
97                 return;
98         }
99
100 }
101
102 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
103
104
105 /*
106   run the "recovered" eventscript on all nodes
107  */
108 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
109 {
110         TALLOC_CTX *tmp_ctx;
111         uint32_t *nodes;
112
113         tmp_ctx = talloc_new(ctdb);
114         CTDB_NO_MEMORY(ctdb, tmp_ctx);
115
116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
118                                         nodes, 0,
119                                         CONTROL_TIMEOUT(), false, tdb_null,
120                                         NULL, NULL,
121                                         NULL) != 0) {
122                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
123
124                 talloc_free(tmp_ctx);
125                 return -1;
126         }
127
128         talloc_free(tmp_ctx);
129         return 0;
130 }
131
132 /*
133   remember the trouble maker
134  */
135 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
138         struct ctdb_banning_state *ban_state;
139
140         if (culprit > ctdb->num_nodes) {
141                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
142                 return;
143         }
144
145         if (ctdb->nodes[culprit]->ban_state == NULL) {
146                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
147                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
148
149                 
150         }
151         ban_state = ctdb->nodes[culprit]->ban_state;
152         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
153                 /* this was the first time in a long while this node
154                    misbehaved so we will forgive any old transgressions.
155                 */
156                 ban_state->count = 0;
157         }
158
159         ban_state->count += count;
160         ban_state->last_reported_time = timeval_current();
161         rec->last_culprit_node = culprit;
162 }
163
164 /*
165   remember the trouble maker
166  */
167 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
168 {
169         ctdb_set_culprit_count(rec, culprit, 1);
170 }
171
172
173 /* this callback is called for every node that failed to execute the
174    start recovery event
175 */
176 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
177 {
178         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
179
180         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
181
182         ctdb_set_culprit(rec, node_pnn);
183 }
184
185 /*
186   run the "startrecovery" eventscript on all nodes
187  */
188 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
189 {
190         TALLOC_CTX *tmp_ctx;
191         uint32_t *nodes;
192         struct ctdb_context *ctdb = rec->ctdb;
193
194         tmp_ctx = talloc_new(ctdb);
195         CTDB_NO_MEMORY(ctdb, tmp_ctx);
196
197         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
199                                         nodes, 0,
200                                         CONTROL_TIMEOUT(), false, tdb_null,
201                                         NULL,
202                                         startrecovery_fail_callback,
203                                         rec) != 0) {
204                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
205                 talloc_free(tmp_ctx);
206                 return -1;
207         }
208
209         talloc_free(tmp_ctx);
210         return 0;
211 }
212
213 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
214 {
215         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
216                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
217                 return;
218         }
219         if (node_pnn < ctdb->num_nodes) {
220                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
221         }
222
223         if (node_pnn == ctdb->pnn) {
224                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
225         }
226 }
227
228 /*
229   update the node capabilities for all connected nodes
230  */
231 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
232 {
233         uint32_t *nodes;
234         TALLOC_CTX *tmp_ctx;
235
236         tmp_ctx = talloc_new(ctdb);
237         CTDB_NO_MEMORY(ctdb, tmp_ctx);
238
239         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
240         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
241                                         nodes, 0,
242                                         CONTROL_TIMEOUT(),
243                                         false, tdb_null,
244                                         async_getcap_callback, NULL,
245                                         NULL) != 0) {
246                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
247                 talloc_free(tmp_ctx);
248                 return -1;
249         }
250
251         talloc_free(tmp_ctx);
252         return 0;
253 }
254
255 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
256 {
257         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
258
259         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
260         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
261 }
262
263 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
264 {
265         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
266
267         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
268         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
269 }
270
271 /*
272   change recovery mode on all nodes
273  */
274 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
275 {
276         TDB_DATA data;
277         uint32_t *nodes;
278         TALLOC_CTX *tmp_ctx;
279
280         tmp_ctx = talloc_new(ctdb);
281         CTDB_NO_MEMORY(ctdb, tmp_ctx);
282
283         /* freeze all nodes */
284         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
285         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
286                 int i;
287
288                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
289                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
290                                                 nodes, i,
291                                                 CONTROL_TIMEOUT(),
292                                                 false, tdb_null,
293                                                 NULL,
294                                                 set_recmode_fail_callback,
295                                                 rec) != 0) {
296                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
297                                 talloc_free(tmp_ctx);
298                                 return -1;
299                         }
300                 }
301         }
302
303
304         data.dsize = sizeof(uint32_t);
305         data.dptr = (unsigned char *)&rec_mode;
306
307         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
308                                         nodes, 0,
309                                         CONTROL_TIMEOUT(),
310                                         false, data,
311                                         NULL, NULL,
312                                         NULL) != 0) {
313                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
314                 talloc_free(tmp_ctx);
315                 return -1;
316         }
317
318         talloc_free(tmp_ctx);
319         return 0;
320 }
321
322 /*
323   change recovery master on all node
324  */
325 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
326 {
327         TDB_DATA data;
328         TALLOC_CTX *tmp_ctx;
329         uint32_t *nodes;
330
331         tmp_ctx = talloc_new(ctdb);
332         CTDB_NO_MEMORY(ctdb, tmp_ctx);
333
334         data.dsize = sizeof(uint32_t);
335         data.dptr = (unsigned char *)&pnn;
336
337         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
338         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
339                                         nodes, 0,
340                                         CONTROL_TIMEOUT(), false, data,
341                                         NULL, NULL,
342                                         NULL) != 0) {
343                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
344                 talloc_free(tmp_ctx);
345                 return -1;
346         }
347
348         talloc_free(tmp_ctx);
349         return 0;
350 }
351
352 /* update all remote nodes to use the same db priority that we have
353    this can fail if the remove node has not yet been upgraded to 
354    support this function, so we always return success and never fail
355    a recovery if this call fails.
356 */
357 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
358         struct ctdb_node_map *nodemap, 
359         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
360 {
361         int db;
362         uint32_t *nodes;
363
364         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
365
366         /* step through all local databases */
367         for (db=0; db<dbmap->num;db++) {
368                 TDB_DATA data;
369                 struct ctdb_db_priority db_prio;
370                 int ret;
371
372                 db_prio.db_id     = dbmap->dbs[db].dbid;
373                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
374                 if (ret != 0) {
375                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
376                         continue;
377                 }
378
379                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
380
381                 data.dptr  = (uint8_t *)&db_prio;
382                 data.dsize = sizeof(db_prio);
383
384                 if (ctdb_client_async_control(ctdb,
385                                         CTDB_CONTROL_SET_DB_PRIORITY,
386                                         nodes, 0,
387                                         CONTROL_TIMEOUT(), false, data,
388                                         NULL, NULL,
389                                         NULL) != 0) {
390                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
391                 }
392         }
393
394         return 0;
395 }                       
396
397 /*
398   ensure all other nodes have attached to any databases that we have
399  */
400 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
401                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
402 {
403         int i, j, db, ret;
404         struct ctdb_dbid_map *remote_dbmap;
405
406         /* verify that all other nodes have all our databases */
407         for (j=0; j<nodemap->num; j++) {
408                 /* we dont need to ourself ourselves */
409                 if (nodemap->nodes[j].pnn == pnn) {
410                         continue;
411                 }
412                 /* dont check nodes that are unavailable */
413                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
414                         continue;
415                 }
416
417                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
418                                          mem_ctx, &remote_dbmap);
419                 if (ret != 0) {
420                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
421                         return -1;
422                 }
423
424                 /* step through all local databases */
425                 for (db=0; db<dbmap->num;db++) {
426                         const char *name;
427
428
429                         for (i=0;i<remote_dbmap->num;i++) {
430                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
431                                         break;
432                                 }
433                         }
434                         /* the remote node already have this database */
435                         if (i!=remote_dbmap->num) {
436                                 continue;
437                         }
438                         /* ok so we need to create this database */
439                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
440                                             mem_ctx, &name);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
443                                 return -1;
444                         }
445                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
446                                            mem_ctx, name,
447                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
448                         if (ret != 0) {
449                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
450                                 return -1;
451                         }
452                 }
453         }
454
455         return 0;
456 }
457
458
459 /*
460   ensure we are attached to any databases that anyone else is attached to
461  */
462 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
463                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
464 {
465         int i, j, db, ret;
466         struct ctdb_dbid_map *remote_dbmap;
467
468         /* verify that we have all database any other node has */
469         for (j=0; j<nodemap->num; j++) {
470                 /* we dont need to ourself ourselves */
471                 if (nodemap->nodes[j].pnn == pnn) {
472                         continue;
473                 }
474                 /* dont check nodes that are unavailable */
475                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
476                         continue;
477                 }
478
479                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
480                                          mem_ctx, &remote_dbmap);
481                 if (ret != 0) {
482                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
483                         return -1;
484                 }
485
486                 /* step through all databases on the remote node */
487                 for (db=0; db<remote_dbmap->num;db++) {
488                         const char *name;
489
490                         for (i=0;i<(*dbmap)->num;i++) {
491                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
492                                         break;
493                                 }
494                         }
495                         /* we already have this db locally */
496                         if (i!=(*dbmap)->num) {
497                                 continue;
498                         }
499                         /* ok so we need to create this database and
500                            rebuild dbmap
501                          */
502                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
503                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
506                                           nodemap->nodes[j].pnn));
507                                 return -1;
508                         }
509                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
510                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
511                         if (ret != 0) {
512                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
513                                 return -1;
514                         }
515                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
516                         if (ret != 0) {
517                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
518                                 return -1;
519                         }
520                 }
521         }
522
523         return 0;
524 }
525
526
527 /*
528   pull the remote database contents from one node into the recdb
529  */
530 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
531                                     struct tdb_wrap *recdb, uint32_t dbid)
532 {
533         int ret;
534         TDB_DATA outdata;
535         struct ctdb_marshall_buffer *reply;
536         struct ctdb_rec_data *rec;
537         int i;
538         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
539
540         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
541                                CONTROL_TIMEOUT(), &outdata);
542         if (ret != 0) {
543                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
544                 talloc_free(tmp_ctx);
545                 return -1;
546         }
547
548         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
549
550         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
551                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
552                 talloc_free(tmp_ctx);
553                 return -1;
554         }
555         
556         rec = (struct ctdb_rec_data *)&reply->data[0];
557         
558         for (i=0;
559              i<reply->count;
560              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
561                 TDB_DATA key, data;
562                 struct ctdb_ltdb_header *hdr;
563                 TDB_DATA existing;
564                 
565                 key.dptr = &rec->data[0];
566                 key.dsize = rec->keylen;
567                 data.dptr = &rec->data[key.dsize];
568                 data.dsize = rec->datalen;
569                 
570                 hdr = (struct ctdb_ltdb_header *)data.dptr;
571
572                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
573                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
574                         talloc_free(tmp_ctx);
575                         return -1;
576                 }
577
578                 /* fetch the existing record, if any */
579                 existing = tdb_fetch(recdb->tdb, key);
580                 
581                 if (existing.dptr != NULL) {
582                         struct ctdb_ltdb_header header;
583                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
584                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
585                                          (unsigned)existing.dsize, srcnode));
586                                 free(existing.dptr);
587                                 talloc_free(tmp_ctx);
588                                 return -1;
589                         }
590                         header = *(struct ctdb_ltdb_header *)existing.dptr;
591                         free(existing.dptr);
592                         if (!(header.rsn < hdr->rsn ||
593                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
594                                 continue;
595                         }
596                 }
597                 
598                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
599                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
600                         talloc_free(tmp_ctx);
601                         return -1;                              
602                 }
603         }
604
605         talloc_free(tmp_ctx);
606
607         return 0;
608 }
609
610
611 struct pull_seqnum_cbdata {
612         int failed;
613         uint32_t pnn;
614         uint64_t seqnum;
615 };
616
617 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
618 {
619         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
620         uint64_t seqnum;
621
622         if (cb_data->failed != 0) {
623                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
624                 return;
625         }
626
627         if (res != 0) {
628                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
629                 cb_data->failed = 1;
630                 return;
631         }
632
633         if (outdata.dsize != sizeof(uint64_t)) {
634                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
635                 cb_data->failed = -1;
636                 return;
637         }
638
639         seqnum = *((uint64_t *)outdata.dptr);
640
641         if (seqnum > cb_data->seqnum) {
642                 cb_data->seqnum = seqnum;
643                 cb_data->pnn = node_pnn;
644         }
645 }
646
647 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
648 {
649         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
650
651         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
652         cb_data->failed = 1;
653 }
654
655 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
656                                 struct ctdb_recoverd *rec, 
657                                 struct ctdb_node_map *nodemap, 
658                                 struct tdb_wrap *recdb, uint32_t dbid)
659 {
660         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
661         uint32_t *nodes;
662         TDB_DATA data;
663         uint32_t outdata[2];
664         struct pull_seqnum_cbdata *cb_data;
665
666         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
667
668         outdata[0] = dbid;
669         outdata[1] = 0;
670
671         data.dsize = sizeof(outdata);
672         data.dptr  = (uint8_t *)&outdata[0];
673
674         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
675         if (cb_data == NULL) {
676                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
677                 talloc_free(tmp_ctx);
678                 return -1;
679         }
680
681         cb_data->failed = 0;
682         cb_data->pnn    = -1;
683         cb_data->seqnum = 0;
684         
685         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
686         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
687                                         nodes, 0,
688                                         CONTROL_TIMEOUT(), false, data,
689                                         pull_seqnum_cb,
690                                         pull_seqnum_fail_cb,
691                                         cb_data) != 0) {
692                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
693
694                 talloc_free(tmp_ctx);
695                 return -1;
696         }
697
698         if (cb_data->failed != 0) {
699                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
700                 talloc_free(tmp_ctx);
701                 return -1;
702         }
703
704         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
705                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
706                 talloc_free(tmp_ctx);
707                 return -1;
708         }
709
710         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
711
712         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
713                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
714                 talloc_free(tmp_ctx);
715                 return -1;
716         }
717
718         talloc_free(tmp_ctx);
719         return 0;
720 }
721
722
723 /*
724   pull all the remote database contents into the recdb
725  */
726 static int pull_remote_database(struct ctdb_context *ctdb,
727                                 struct ctdb_recoverd *rec, 
728                                 struct ctdb_node_map *nodemap, 
729                                 struct tdb_wrap *recdb, uint32_t dbid,
730                                 bool persistent)
731 {
732         int j;
733
734         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
735                 int ret;
736                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
737                 if (ret == 0) {
738                         return 0;
739                 }
740         }
741
742         /* pull all records from all other nodes across onto this node
743            (this merges based on rsn)
744         */
745         for (j=0; j<nodemap->num; j++) {
746                 /* dont merge from nodes that are unavailable */
747                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
748                         continue;
749                 }
750                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
751                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
752                                  nodemap->nodes[j].pnn));
753                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
754                         return -1;
755                 }
756         }
757         
758         return 0;
759 }
760
761
762 /*
763   update flags on all active nodes
764  */
765 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
766 {
767         int ret;
768
769         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
770                 if (ret != 0) {
771                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
772                 return -1;
773         }
774
775         return 0;
776 }
777
778 /*
779   ensure all nodes have the same vnnmap we do
780  */
781 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
782                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
783 {
784         int j, ret;
785
786         /* push the new vnn map out to all the nodes */
787         for (j=0; j<nodemap->num; j++) {
788                 /* dont push to nodes that are unavailable */
789                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
790                         continue;
791                 }
792
793                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
794                 if (ret != 0) {
795                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
796                         return -1;
797                 }
798         }
799
800         return 0;
801 }
802
803
804 struct vacuum_info {
805         struct vacuum_info *next, *prev;
806         struct ctdb_recoverd *rec;
807         uint32_t srcnode;
808         struct ctdb_db_context *ctdb_db;
809         struct ctdb_marshall_buffer *recs;
810         struct ctdb_rec_data *r;
811 };
812
813 static void vacuum_fetch_next(struct vacuum_info *v);
814
815 /*
816   called when a vacuum fetch has completed - just free it and do the next one
817  */
818 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
819 {
820         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
821         talloc_free(state);
822         vacuum_fetch_next(v);
823 }
824
825
826 /*
827   process the next element from the vacuum list
828 */
829 static void vacuum_fetch_next(struct vacuum_info *v)
830 {
831         struct ctdb_call call;
832         struct ctdb_rec_data *r;
833
834         while (v->recs->count) {
835                 struct ctdb_client_call_state *state;
836                 TDB_DATA data;
837                 struct ctdb_ltdb_header *hdr;
838
839                 ZERO_STRUCT(call);
840                 call.call_id = CTDB_NULL_FUNC;
841                 call.flags = CTDB_IMMEDIATE_MIGRATION;
842                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
843
844                 r = v->r;
845                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
846                 v->recs->count--;
847
848                 call.key.dptr = &r->data[0];
849                 call.key.dsize = r->keylen;
850
851                 /* ensure we don't block this daemon - just skip a record if we can't get
852                    the chainlock */
853                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
854                         continue;
855                 }
856
857                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
858                 if (data.dptr == NULL) {
859                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
860                         continue;
861                 }
862
863                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
864                         free(data.dptr);
865                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
866                         continue;
867                 }
868                 
869                 hdr = (struct ctdb_ltdb_header *)data.dptr;
870                 if (hdr->dmaster == v->rec->ctdb->pnn) {
871                         /* its already local */
872                         free(data.dptr);
873                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
874                         continue;
875                 }
876
877                 free(data.dptr);
878
879                 state = ctdb_call_send(v->ctdb_db, &call);
880                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
881                 if (state == NULL) {
882                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
883                         talloc_free(v);
884                         return;
885                 }
886                 state->async.fn = vacuum_fetch_callback;
887                 state->async.private_data = v;
888                 return;
889         }
890
891         talloc_free(v);
892 }
893
894
895 /*
896   destroy a vacuum info structure
897  */
898 static int vacuum_info_destructor(struct vacuum_info *v)
899 {
900         DLIST_REMOVE(v->rec->vacuum_info, v);
901         return 0;
902 }
903
904
905 /*
906   handler for vacuum fetch
907 */
908 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
909                                  TDB_DATA data, void *private_data)
910 {
911         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
912         struct ctdb_marshall_buffer *recs;
913         int ret, i;
914         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
915         const char *name;
916         struct ctdb_dbid_map *dbmap=NULL;
917         bool persistent = false;
918         struct ctdb_db_context *ctdb_db;
919         struct ctdb_rec_data *r;
920         uint32_t srcnode;
921         struct vacuum_info *v;
922
923         recs = (struct ctdb_marshall_buffer *)data.dptr;
924         r = (struct ctdb_rec_data *)&recs->data[0];
925
926         if (recs->count == 0) {
927                 talloc_free(tmp_ctx);
928                 return;
929         }
930
931         srcnode = r->reqid;
932
933         for (v=rec->vacuum_info;v;v=v->next) {
934                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
935                         /* we're already working on records from this node */
936                         talloc_free(tmp_ctx);
937                         return;
938                 }
939         }
940
941         /* work out if the database is persistent */
942         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
943         if (ret != 0) {
944                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
945                 talloc_free(tmp_ctx);
946                 return;
947         }
948
949         for (i=0;i<dbmap->num;i++) {
950                 if (dbmap->dbs[i].dbid == recs->db_id) {
951                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
952                         break;
953                 }
954         }
955         if (i == dbmap->num) {
956                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
957                 talloc_free(tmp_ctx);
958                 return;         
959         }
960
961         /* find the name of this database */
962         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
963                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
964                 talloc_free(tmp_ctx);
965                 return;
966         }
967
968         /* attach to it */
969         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
970         if (ctdb_db == NULL) {
971                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
972                 talloc_free(tmp_ctx);
973                 return;
974         }
975
976         v = talloc_zero(rec, struct vacuum_info);
977         if (v == NULL) {
978                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
979                 talloc_free(tmp_ctx);
980                 return;
981         }
982
983         v->rec = rec;
984         v->srcnode = srcnode;
985         v->ctdb_db = ctdb_db;
986         v->recs = talloc_memdup(v, recs, data.dsize);
987         if (v->recs == NULL) {
988                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
989                 talloc_free(v);
990                 talloc_free(tmp_ctx);
991                 return;         
992         }
993         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
994
995         DLIST_ADD(rec->vacuum_info, v);
996
997         talloc_set_destructor(v, vacuum_info_destructor);
998
999         vacuum_fetch_next(v);
1000         talloc_free(tmp_ctx);
1001 }
1002
1003
1004 /*
1005   called when ctdb_wait_timeout should finish
1006  */
1007 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1008                               struct timeval yt, void *p)
1009 {
1010         uint32_t *timed_out = (uint32_t *)p;
1011         (*timed_out) = 1;
1012 }
1013
1014 /*
1015   wait for a given number of seconds
1016  */
1017 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1018 {
1019         uint32_t timed_out = 0;
1020         time_t usecs = (secs - (time_t)secs) * 1000000;
1021         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1022         while (!timed_out) {
1023                 event_loop_once(ctdb->ev);
1024         }
1025 }
1026
1027 /*
1028   called when an election times out (ends)
1029  */
1030 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1031                                   struct timeval t, void *p)
1032 {
1033         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1034         rec->election_timeout = NULL;
1035         fast_start = false;
1036
1037         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1038 }
1039
1040
1041 /*
1042   wait for an election to finish. It finished election_timeout seconds after
1043   the last election packet is received
1044  */
1045 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1046 {
1047         struct ctdb_context *ctdb = rec->ctdb;
1048         while (rec->election_timeout) {
1049                 event_loop_once(ctdb->ev);
1050         }
1051 }
1052
1053 /*
1054   Update our local flags from all remote connected nodes. 
1055   This is only run when we are or we belive we are the recovery master
1056  */
1057 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1058 {
1059         int j;
1060         struct ctdb_context *ctdb = rec->ctdb;
1061         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1062
1063         /* get the nodemap for all active remote nodes and verify
1064            they are the same as for this node
1065          */
1066         for (j=0; j<nodemap->num; j++) {
1067                 struct ctdb_node_map *remote_nodemap=NULL;
1068                 int ret;
1069
1070                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1071                         continue;
1072                 }
1073                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1074                         continue;
1075                 }
1076
1077                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1078                                            mem_ctx, &remote_nodemap);
1079                 if (ret != 0) {
1080                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1081                                   nodemap->nodes[j].pnn));
1082                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1083                         talloc_free(mem_ctx);
1084                         return MONITOR_FAILED;
1085                 }
1086                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1087                         /* We should tell our daemon about this so it
1088                            updates its flags or else we will log the same 
1089                            message again in the next iteration of recovery.
1090                            Since we are the recovery master we can just as
1091                            well update the flags on all nodes.
1092                         */
1093                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1094                         if (ret != 0) {
1095                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1096                                 return -1;
1097                         }
1098
1099                         /* Update our local copy of the flags in the recovery
1100                            daemon.
1101                         */
1102                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1103                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1104                                  nodemap->nodes[j].flags));
1105                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1106                 }
1107                 talloc_free(remote_nodemap);
1108         }
1109         talloc_free(mem_ctx);
1110         return MONITOR_OK;
1111 }
1112
1113
1114 /* Create a new random generation ip. 
1115    The generation id can not be the INVALID_GENERATION id
1116 */
1117 static uint32_t new_generation(void)
1118 {
1119         uint32_t generation;
1120
1121         while (1) {
1122                 generation = random();
1123
1124                 if (generation != INVALID_GENERATION) {
1125                         break;
1126                 }
1127         }
1128
1129         return generation;
1130 }
1131
1132
1133 /*
1134   create a temporary working database
1135  */
1136 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1137 {
1138         char *name;
1139         struct tdb_wrap *recdb;
1140         unsigned tdb_flags;
1141
1142         /* open up the temporary recovery database */
1143         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1144                                ctdb->db_directory_state,
1145                                ctdb->pnn);
1146         if (name == NULL) {
1147                 return NULL;
1148         }
1149         unlink(name);
1150
1151         tdb_flags = TDB_NOLOCK;
1152         if (ctdb->valgrinding) {
1153                 tdb_flags |= TDB_NOMMAP;
1154         }
1155         tdb_flags |= TDB_DISALLOW_NESTING;
1156
1157         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1158                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1159         if (recdb == NULL) {
1160                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1161         }
1162
1163         talloc_free(name);
1164
1165         return recdb;
1166 }
1167
1168
1169 /* 
1170    a traverse function for pulling all relevent records from recdb
1171  */
1172 struct recdb_data {
1173         struct ctdb_context *ctdb;
1174         struct ctdb_marshall_buffer *recdata;
1175         uint32_t len;
1176         bool failed;
1177         bool persistent;
1178 };
1179
1180 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1181 {
1182         struct recdb_data *params = (struct recdb_data *)p;
1183         struct ctdb_rec_data *rec;
1184         struct ctdb_ltdb_header *hdr;
1185
1186         /* skip empty records */
1187         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1188                 return 0;
1189         }
1190
1191         /* update the dmaster field to point to us */
1192         hdr = (struct ctdb_ltdb_header *)data.dptr;
1193         if (!params->persistent) {
1194                 hdr->dmaster = params->ctdb->pnn;
1195                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1196         }
1197
1198         /* add the record to the blob ready to send to the nodes */
1199         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1200         if (rec == NULL) {
1201                 params->failed = true;
1202                 return -1;
1203         }
1204         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1205         if (params->recdata == NULL) {
1206                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1207                          rec->length + params->len, params->recdata->count));
1208                 params->failed = true;
1209                 return -1;
1210         }
1211         params->recdata->count++;
1212         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1213         params->len += rec->length;
1214         talloc_free(rec);
1215
1216         return 0;
1217 }
1218
1219 /*
1220   push the recdb database out to all nodes
1221  */
1222 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1223                                bool persistent,
1224                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1225 {
1226         struct recdb_data params;
1227         struct ctdb_marshall_buffer *recdata;
1228         TDB_DATA outdata;
1229         TALLOC_CTX *tmp_ctx;
1230         uint32_t *nodes;
1231
1232         tmp_ctx = talloc_new(ctdb);
1233         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1234
1235         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1236         CTDB_NO_MEMORY(ctdb, recdata);
1237
1238         recdata->db_id = dbid;
1239
1240         params.ctdb = ctdb;
1241         params.recdata = recdata;
1242         params.len = offsetof(struct ctdb_marshall_buffer, data);
1243         params.failed = false;
1244         params.persistent = persistent;
1245
1246         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1247                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1248                 talloc_free(params.recdata);
1249                 talloc_free(tmp_ctx);
1250                 return -1;
1251         }
1252
1253         if (params.failed) {
1254                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1255                 talloc_free(params.recdata);
1256                 talloc_free(tmp_ctx);
1257                 return -1;              
1258         }
1259
1260         recdata = params.recdata;
1261
1262         outdata.dptr = (void *)recdata;
1263         outdata.dsize = params.len;
1264
1265         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1266         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1267                                         nodes, 0,
1268                                         CONTROL_TIMEOUT(), false, outdata,
1269                                         NULL, NULL,
1270                                         NULL) != 0) {
1271                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1272                 talloc_free(recdata);
1273                 talloc_free(tmp_ctx);
1274                 return -1;
1275         }
1276
1277         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1278                   dbid, recdata->count));
1279
1280         talloc_free(recdata);
1281         talloc_free(tmp_ctx);
1282
1283         return 0;
1284 }
1285
1286
1287 /*
1288   go through a full recovery on one database 
1289  */
1290 static int recover_database(struct ctdb_recoverd *rec, 
1291                             TALLOC_CTX *mem_ctx,
1292                             uint32_t dbid,
1293                             bool persistent,
1294                             uint32_t pnn, 
1295                             struct ctdb_node_map *nodemap,
1296                             uint32_t transaction_id)
1297 {
1298         struct tdb_wrap *recdb;
1299         int ret;
1300         struct ctdb_context *ctdb = rec->ctdb;
1301         TDB_DATA data;
1302         struct ctdb_control_wipe_database w;
1303         uint32_t *nodes;
1304
1305         recdb = create_recdb(ctdb, mem_ctx);
1306         if (recdb == NULL) {
1307                 return -1;
1308         }
1309
1310         /* pull all remote databases onto the recdb */
1311         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1312         if (ret != 0) {
1313                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1314                 return -1;
1315         }
1316
1317         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1318
1319         /* wipe all the remote databases. This is safe as we are in a transaction */
1320         w.db_id = dbid;
1321         w.transaction_id = transaction_id;
1322
1323         data.dptr = (void *)&w;
1324         data.dsize = sizeof(w);
1325
1326         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1327         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1328                                         nodes, 0,
1329                                         CONTROL_TIMEOUT(), false, data,
1330                                         NULL, NULL,
1331                                         NULL) != 0) {
1332                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1333                 talloc_free(recdb);
1334                 return -1;
1335         }
1336         
1337         /* push out the correct database. This sets the dmaster and skips 
1338            the empty records */
1339         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1340         if (ret != 0) {
1341                 talloc_free(recdb);
1342                 return -1;
1343         }
1344
1345         /* all done with this database */
1346         talloc_free(recdb);
1347
1348         return 0;
1349 }
1350
1351 /*
1352   reload the nodes file 
1353 */
1354 static void reload_nodes_file(struct ctdb_context *ctdb)
1355 {
1356         ctdb->nodes = NULL;
1357         ctdb_load_nodes_file(ctdb);
1358 }
1359
1360 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1361                                          struct ctdb_recoverd *rec,
1362                                          struct ctdb_node_map *nodemap,
1363                                          uint32_t *culprit)
1364 {
1365         int j;
1366         int ret;
1367
1368         if (ctdb->num_nodes != nodemap->num) {
1369                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1370                                   ctdb->num_nodes, nodemap->num));
1371                 if (culprit) {
1372                         *culprit = ctdb->pnn;
1373                 }
1374                 return -1;
1375         }
1376
1377         for (j=0; j<nodemap->num; j++) {
1378                 /* release any existing data */
1379                 if (ctdb->nodes[j]->known_public_ips) {
1380                         talloc_free(ctdb->nodes[j]->known_public_ips);
1381                         ctdb->nodes[j]->known_public_ips = NULL;
1382                 }
1383                 if (ctdb->nodes[j]->available_public_ips) {
1384                         talloc_free(ctdb->nodes[j]->available_public_ips);
1385                         ctdb->nodes[j]->available_public_ips = NULL;
1386                 }
1387
1388                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1389                         continue;
1390                 }
1391
1392                 /* grab a new shiny list of public ips from the node */
1393                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1394                                         CONTROL_TIMEOUT(),
1395                                         ctdb->nodes[j]->pnn,
1396                                         ctdb->nodes,
1397                                         0,
1398                                         &ctdb->nodes[j]->known_public_ips);
1399                 if (ret != 0) {
1400                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1401                                 ctdb->nodes[j]->pnn));
1402                         if (culprit) {
1403                                 *culprit = ctdb->nodes[j]->pnn;
1404                         }
1405                         return -1;
1406                 }
1407
1408                 if (ctdb->do_checkpublicip) {
1409                         if (rec->ip_check_disable_ctx == NULL) {
1410                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1411                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1412                                         rec->need_takeover_run = true;
1413                                 }
1414                         }
1415                 }
1416
1417                 /* grab a new shiny list of public ips from the node */
1418                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1419                                         CONTROL_TIMEOUT(),
1420                                         ctdb->nodes[j]->pnn,
1421                                         ctdb->nodes,
1422                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1423                                         &ctdb->nodes[j]->available_public_ips);
1424                 if (ret != 0) {
1425                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1426                                 ctdb->nodes[j]->pnn));
1427                         if (culprit) {
1428                                 *culprit = ctdb->nodes[j]->pnn;
1429                         }
1430                         return -1;
1431                 }
1432         }
1433
1434         return 0;
1435 }
1436
1437 /* when we start a recovery, make sure all nodes use the same reclock file
1438    setting
1439 */
1440 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1441 {
1442         struct ctdb_context *ctdb = rec->ctdb;
1443         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1444         TDB_DATA data;
1445         uint32_t *nodes;
1446
1447         if (ctdb->recovery_lock_file == NULL) {
1448                 data.dptr  = NULL;
1449                 data.dsize = 0;
1450         } else {
1451                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1452                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1453         }
1454
1455         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1456         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1457                                         nodes, 0,
1458                                         CONTROL_TIMEOUT(),
1459                                         false, data,
1460                                         NULL, NULL,
1461                                         rec) != 0) {
1462                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1463                 talloc_free(tmp_ctx);
1464                 return -1;
1465         }
1466
1467         talloc_free(tmp_ctx);
1468         return 0;
1469 }
1470
1471
1472 /*
1473   we are the recmaster, and recovery is needed - start a recovery run
1474  */
1475 static int do_recovery(struct ctdb_recoverd *rec, 
1476                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1477                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1478 {
1479         struct ctdb_context *ctdb = rec->ctdb;
1480         int i, j, ret;
1481         uint32_t generation;
1482         struct ctdb_dbid_map *dbmap;
1483         TDB_DATA data;
1484         uint32_t *nodes;
1485         struct timeval start_time;
1486         uint32_t culprit = (uint32_t)-1;
1487
1488         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1489
1490         /* if recovery fails, force it again */
1491         rec->need_recovery = true;
1492
1493         for (i=0; i<ctdb->num_nodes; i++) {
1494                 struct ctdb_banning_state *ban_state;
1495
1496                 if (ctdb->nodes[i]->ban_state == NULL) {
1497                         continue;
1498                 }
1499                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1500                 if (ban_state->count < 2*ctdb->num_nodes) {
1501                         continue;
1502                 }
1503                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1504                         ctdb->nodes[i]->pnn, ban_state->count,
1505                         ctdb->tunable.recovery_ban_period));
1506                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1507                 ban_state->count = 0;
1508         }
1509
1510
1511         if (ctdb->tunable.verify_recovery_lock != 0) {
1512                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1513                 start_time = timeval_current();
1514                 if (!ctdb_recovery_lock(ctdb, true)) {
1515                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1516                                          "and ban ourself for %u seconds\n",
1517                                          ctdb->tunable.recovery_ban_period));
1518                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1519                         return -1;
1520                 }
1521                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1522                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1523         }
1524
1525         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1526
1527         /* get a list of all databases */
1528         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1529         if (ret != 0) {
1530                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1531                 return -1;
1532         }
1533
1534         /* we do the db creation before we set the recovery mode, so the freeze happens
1535            on all databases we will be dealing with. */
1536
1537         /* verify that we have all the databases any other node has */
1538         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1539         if (ret != 0) {
1540                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1541                 return -1;
1542         }
1543
1544         /* verify that all other nodes have all our databases */
1545         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1546         if (ret != 0) {
1547                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1548                 return -1;
1549         }
1550         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1551
1552         /* update the database priority for all remote databases */
1553         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1554         if (ret != 0) {
1555                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1556         }
1557         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1558
1559
1560         /* update all other nodes to use the same setting for reclock files
1561            as the local recovery master.
1562         */
1563         sync_recovery_lock_file_across_cluster(rec);
1564
1565         /* set recovery mode to active on all nodes */
1566         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1567         if (ret != 0) {
1568                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1569                 return -1;
1570         }
1571
1572         /* execute the "startrecovery" event script on all nodes */
1573         ret = run_startrecovery_eventscript(rec, nodemap);
1574         if (ret!=0) {
1575                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1576                 return -1;
1577         }
1578
1579         /*
1580           update all nodes to have the same flags that we have
1581          */
1582         for (i=0;i<nodemap->num;i++) {
1583                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1584                         continue;
1585                 }
1586
1587                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1588                 if (ret != 0) {
1589                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1590                         return -1;
1591                 }
1592         }
1593
1594         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1595
1596         /* pick a new generation number */
1597         generation = new_generation();
1598
1599         /* change the vnnmap on this node to use the new generation 
1600            number but not on any other nodes.
1601            this guarantees that if we abort the recovery prematurely
1602            for some reason (a node stops responding?)
1603            that we can just return immediately and we will reenter
1604            recovery shortly again.
1605            I.e. we deliberately leave the cluster with an inconsistent
1606            generation id to allow us to abort recovery at any stage and
1607            just restart it from scratch.
1608          */
1609         vnnmap->generation = generation;
1610         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1611         if (ret != 0) {
1612                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1613                 return -1;
1614         }
1615
1616         data.dptr = (void *)&generation;
1617         data.dsize = sizeof(uint32_t);
1618
1619         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1620         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1621                                         nodes, 0,
1622                                         CONTROL_TIMEOUT(), false, data,
1623                                         NULL,
1624                                         transaction_start_fail_callback,
1625                                         rec) != 0) {
1626                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1627                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1628                                         nodes, 0,
1629                                         CONTROL_TIMEOUT(), false, tdb_null,
1630                                         NULL,
1631                                         NULL,
1632                                         NULL) != 0) {
1633                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1634                 }
1635                 return -1;
1636         }
1637
1638         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1639
1640         for (i=0;i<dbmap->num;i++) {
1641                 ret = recover_database(rec, mem_ctx,
1642                                        dbmap->dbs[i].dbid,
1643                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1644                                        pnn, nodemap, generation);
1645                 if (ret != 0) {
1646                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1647                         return -1;
1648                 }
1649         }
1650
1651         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1652
1653         /* commit all the changes */
1654         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1655                                         nodes, 0,
1656                                         CONTROL_TIMEOUT(), false, data,
1657                                         NULL, NULL,
1658                                         NULL) != 0) {
1659                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1660                 return -1;
1661         }
1662
1663         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1664         
1665
1666         /* update the capabilities for all nodes */
1667         ret = update_capabilities(ctdb, nodemap);
1668         if (ret!=0) {
1669                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1670                 return -1;
1671         }
1672
1673         /* build a new vnn map with all the currently active and
1674            unbanned nodes */
1675         generation = new_generation();
1676         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1677         CTDB_NO_MEMORY(ctdb, vnnmap);
1678         vnnmap->generation = generation;
1679         vnnmap->size = 0;
1680         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1681         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1682         for (i=j=0;i<nodemap->num;i++) {
1683                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1684                         continue;
1685                 }
1686                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1687                         /* this node can not be an lmaster */
1688                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1689                         continue;
1690                 }
1691
1692                 vnnmap->size++;
1693                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1694                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1695                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1696
1697         }
1698         if (vnnmap->size == 0) {
1699                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1700                 vnnmap->size++;
1701                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1702                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1703                 vnnmap->map[0] = pnn;
1704         }       
1705
1706         /* update to the new vnnmap on all nodes */
1707         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1708         if (ret != 0) {
1709                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1710                 return -1;
1711         }
1712
1713         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1714
1715         /* update recmaster to point to us for all nodes */
1716         ret = set_recovery_master(ctdb, nodemap, pnn);
1717         if (ret!=0) {
1718                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1719                 return -1;
1720         }
1721
1722         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1723
1724         /*
1725           update all nodes to have the same flags that we have
1726          */
1727         for (i=0;i<nodemap->num;i++) {
1728                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1729                         continue;
1730                 }
1731
1732                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1733                 if (ret != 0) {
1734                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1735                         return -1;
1736                 }
1737         }
1738
1739         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1740
1741         /* disable recovery mode */
1742         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1743         if (ret != 0) {
1744                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1745                 return -1;
1746         }
1747
1748         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1749
1750         /*
1751           tell nodes to takeover their public IPs
1752          */
1753         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1754         if (ret != 0) {
1755                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1756                                  culprit));
1757                 rec->need_takeover_run = true;
1758                 return -1;
1759         }
1760         rec->need_takeover_run = false;
1761         ret = ctdb_takeover_run(ctdb, nodemap);
1762         if (ret != 0) {
1763                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1764                 rec->need_takeover_run = true;
1765         }
1766
1767         /* execute the "recovered" event script on all nodes */
1768         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1769         if (ret!=0) {
1770                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1771                 return -1;
1772         }
1773
1774         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1775
1776         /* send a message to all clients telling them that the cluster 
1777            has been reconfigured */
1778         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1779
1780         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1781
1782         rec->need_recovery = false;
1783
1784         /* we managed to complete a full recovery, make sure to forgive
1785            any past sins by the nodes that could now participate in the
1786            recovery.
1787         */
1788         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1789         for (i=0;i<nodemap->num;i++) {
1790                 struct ctdb_banning_state *ban_state;
1791
1792                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1793                         continue;
1794                 }
1795
1796                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1797                 if (ban_state == NULL) {
1798                         continue;
1799                 }
1800
1801                 ban_state->count = 0;
1802         }
1803
1804
1805         /* We just finished a recovery successfully. 
1806            We now wait for rerecovery_timeout before we allow 
1807            another recovery to take place.
1808         */
1809         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1810         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1811         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1812
1813         return 0;
1814 }
1815
1816
1817 /*
1818   elections are won by first checking the number of connected nodes, then
1819   the priority time, then the pnn
1820  */
1821 struct election_message {
1822         uint32_t num_connected;
1823         struct timeval priority_time;
1824         uint32_t pnn;
1825         uint32_t node_flags;
1826 };
1827
1828 /*
1829   form this nodes election data
1830  */
1831 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1832 {
1833         int ret, i;
1834         struct ctdb_node_map *nodemap;
1835         struct ctdb_context *ctdb = rec->ctdb;
1836
1837         ZERO_STRUCTP(em);
1838
1839         em->pnn = rec->ctdb->pnn;
1840         em->priority_time = rec->priority_time;
1841
1842         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1845                 return;
1846         }
1847
1848         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1849         em->node_flags = rec->node_flags;
1850
1851         for (i=0;i<nodemap->num;i++) {
1852                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1853                         em->num_connected++;
1854                 }
1855         }
1856
1857         /* we shouldnt try to win this election if we cant be a recmaster */
1858         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1859                 em->num_connected = 0;
1860                 em->priority_time = timeval_current();
1861         }
1862
1863         talloc_free(nodemap);
1864 }
1865
1866 /*
1867   see if the given election data wins
1868  */
1869 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1870 {
1871         struct election_message myem;
1872         int cmp = 0;
1873
1874         ctdb_election_data(rec, &myem);
1875
1876         /* we cant win if we dont have the recmaster capability */
1877         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1878                 return false;
1879         }
1880
1881         /* we cant win if we are banned */
1882         if (rec->node_flags & NODE_FLAGS_BANNED) {
1883                 return false;
1884         }       
1885
1886         /* we cant win if we are stopped */
1887         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1888                 return false;
1889         }       
1890
1891         /* we will automatically win if the other node is banned */
1892         if (em->node_flags & NODE_FLAGS_BANNED) {
1893                 return true;
1894         }
1895
1896         /* we will automatically win if the other node is banned */
1897         if (em->node_flags & NODE_FLAGS_STOPPED) {
1898                 return true;
1899         }
1900
1901         /* try to use the most connected node */
1902         if (cmp == 0) {
1903                 cmp = (int)myem.num_connected - (int)em->num_connected;
1904         }
1905
1906         /* then the longest running node */
1907         if (cmp == 0) {
1908                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1909         }
1910
1911         if (cmp == 0) {
1912                 cmp = (int)myem.pnn - (int)em->pnn;
1913         }
1914
1915         return cmp > 0;
1916 }
1917
1918 /*
1919   send out an election request
1920  */
1921 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1922 {
1923         int ret;
1924         TDB_DATA election_data;
1925         struct election_message emsg;
1926         uint64_t srvid;
1927         struct ctdb_context *ctdb = rec->ctdb;
1928
1929         srvid = CTDB_SRVID_RECOVERY;
1930
1931         ctdb_election_data(rec, &emsg);
1932
1933         election_data.dsize = sizeof(struct election_message);
1934         election_data.dptr  = (unsigned char *)&emsg;
1935
1936
1937         /* send an election message to all active nodes */
1938         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1939         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1940
1941
1942         /* A new node that is already frozen has entered the cluster.
1943            The existing nodes are not frozen and dont need to be frozen
1944            until the election has ended and we start the actual recovery
1945         */
1946         if (update_recmaster == true) {
1947                 /* first we assume we will win the election and set 
1948                    recoverymaster to be ourself on the current node
1949                  */
1950                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1951                 if (ret != 0) {
1952                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1953                         return -1;
1954                 }
1955         }
1956
1957
1958         return 0;
1959 }
1960
1961 /*
1962   this function will unban all nodes in the cluster
1963 */
1964 static void unban_all_nodes(struct ctdb_context *ctdb)
1965 {
1966         int ret, i;
1967         struct ctdb_node_map *nodemap;
1968         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1969         
1970         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1971         if (ret != 0) {
1972                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1973                 return;
1974         }
1975
1976         for (i=0;i<nodemap->num;i++) {
1977                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1978                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1979                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1980                 }
1981         }
1982
1983         talloc_free(tmp_ctx);
1984 }
1985
1986
1987 /*
1988   we think we are winning the election - send a broadcast election request
1989  */
1990 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1991 {
1992         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1993         int ret;
1994
1995         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1996         if (ret != 0) {
1997                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1998         }
1999
2000         talloc_free(rec->send_election_te);
2001         rec->send_election_te = NULL;
2002 }
2003
2004 /*
2005   handler for memory dumps
2006 */
2007 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2008                              TDB_DATA data, void *private_data)
2009 {
2010         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2011         TDB_DATA *dump;
2012         int ret;
2013         struct rd_memdump_reply *rd;
2014
2015         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2016                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2017                 talloc_free(tmp_ctx);
2018                 return;
2019         }
2020         rd = (struct rd_memdump_reply *)data.dptr;
2021
2022         dump = talloc_zero(tmp_ctx, TDB_DATA);
2023         if (dump == NULL) {
2024                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2025                 talloc_free(tmp_ctx);
2026                 return;
2027         }
2028         ret = ctdb_dump_memory(ctdb, dump);
2029         if (ret != 0) {
2030                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2031                 talloc_free(tmp_ctx);
2032                 return;
2033         }
2034
2035 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2036
2037         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2038         if (ret != 0) {
2039                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2040                 talloc_free(tmp_ctx);
2041                 return;
2042         }
2043
2044         talloc_free(tmp_ctx);
2045 }
2046
2047 /*
2048   handler for reload_nodes
2049 */
2050 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2051                              TDB_DATA data, void *private_data)
2052 {
2053         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2054
2055         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2056
2057         reload_nodes_file(rec->ctdb);
2058 }
2059
2060
2061 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2062                               struct timeval yt, void *p)
2063 {
2064         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2065
2066         talloc_free(rec->ip_check_disable_ctx);
2067         rec->ip_check_disable_ctx = NULL;
2068 }
2069
2070
2071 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2072                                   struct timeval t, void *p)
2073 {
2074         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2075         struct ctdb_context *ctdb = rec->ctdb;
2076         int ret;
2077
2078         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2079
2080         ret = ctdb_takeover_run(ctdb, rec->nodemap);
2081         if (ret != 0) {
2082                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2083                 rec->need_takeover_run = true;
2084         }
2085
2086         talloc_free(rec->deferred_rebalance_ctx);
2087         rec->deferred_rebalance_ctx = NULL;
2088 }
2089
2090         
2091 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2092                              TDB_DATA data, void *private_data)
2093 {
2094         uint32_t pnn;
2095         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2096
2097         if (data.dsize != sizeof(uint32_t)) {
2098                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2099                 return;
2100         }
2101
2102         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2103                 return;
2104         }
2105
2106         pnn = *(uint32_t *)&data.dptr[0];
2107
2108         lcp2_forcerebalance(ctdb, pnn);
2109         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2110
2111         if (rec->deferred_rebalance_ctx != NULL) {
2112                 talloc_free(rec->deferred_rebalance_ctx);
2113         }
2114         rec->deferred_rebalance_ctx = talloc_new(rec);
2115         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2116                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2117                         ctdb_rebalance_timeout, rec);
2118 }
2119
2120
2121
2122 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2123                              TDB_DATA data, void *private_data)
2124 {
2125         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2126         struct ctdb_public_ip *ip;
2127
2128         if (rec->recmaster != rec->ctdb->pnn) {
2129                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2130                 return;
2131         }
2132
2133         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2134                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2135                 return;
2136         }
2137
2138         ip = (struct ctdb_public_ip *)data.dptr;
2139
2140         update_ip_assignment_tree(rec->ctdb, ip);
2141 }
2142
2143
2144 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2145                              TDB_DATA data, void *private_data)
2146 {
2147         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2148         uint32_t timeout;
2149
2150         if (rec->ip_check_disable_ctx != NULL) {
2151                 talloc_free(rec->ip_check_disable_ctx);
2152                 rec->ip_check_disable_ctx = NULL;
2153         }
2154
2155         if (data.dsize != sizeof(uint32_t)) {
2156                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2157                                  "expexting %lu\n", (long unsigned)data.dsize,
2158                                  (long unsigned)sizeof(uint32_t)));
2159                 return;
2160         }
2161         if (data.dptr == NULL) {
2162                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2163                 return;
2164         }
2165
2166         timeout = *((uint32_t *)data.dptr);
2167         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2168
2169         rec->ip_check_disable_ctx = talloc_new(rec);
2170         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2171
2172         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2173 }
2174
2175
2176 /*
2177   handler for ip reallocate, just add it to the list of callers and 
2178   handle this later in the monitor_cluster loop so we do not recurse
2179   with other callers to takeover_run()
2180 */
2181 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2182                              TDB_DATA data, void *private_data)
2183 {
2184         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2185         struct ip_reallocate_list *caller;
2186
2187         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2188                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2189                 return;
2190         }
2191
2192         if (rec->ip_reallocate_ctx == NULL) {
2193                 rec->ip_reallocate_ctx = talloc_new(rec);
2194                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2195         }
2196
2197         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2198         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2199
2200         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2201         caller->next = rec->reallocate_callers;
2202         rec->reallocate_callers = caller;
2203
2204         return;
2205 }
2206
2207 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2208 {
2209         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2210         TDB_DATA result;
2211         int32_t ret;
2212         struct ip_reallocate_list *callers;
2213         uint32_t culprit;
2214
2215         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2216
2217         /* update the list of public ips that a node can handle for
2218            all connected nodes
2219         */
2220         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2221         if (ret != 0) {
2222                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2223                                  culprit));
2224                 rec->need_takeover_run = true;
2225         }
2226         if (ret == 0) {
2227                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2228                 if (ret != 0) {
2229                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2230                         rec->need_takeover_run = true;
2231                 }
2232         }
2233
2234         result.dsize = sizeof(int32_t);
2235         result.dptr  = (uint8_t *)&ret;
2236
2237         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2238
2239                 /* Someone that sent srvid==0 does not want a reply */
2240                 if (callers->rd->srvid == 0) {
2241                         continue;
2242                 }
2243                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2244                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2245                                   (unsigned long long)callers->rd->srvid));
2246                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2247                 if (ret != 0) {
2248                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2249                                          "message to %u:%llu\n",
2250                                          (unsigned)callers->rd->pnn,
2251                                          (unsigned long long)callers->rd->srvid));
2252                 }
2253         }
2254
2255         talloc_free(tmp_ctx);
2256         talloc_free(rec->ip_reallocate_ctx);
2257         rec->ip_reallocate_ctx = NULL;
2258         rec->reallocate_callers = NULL;
2259         
2260 }
2261
2262
2263 /*
2264   handler for recovery master elections
2265 */
2266 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2267                              TDB_DATA data, void *private_data)
2268 {
2269         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2270         int ret;
2271         struct election_message *em = (struct election_message *)data.dptr;
2272         TALLOC_CTX *mem_ctx;
2273
2274         /* we got an election packet - update the timeout for the election */
2275         talloc_free(rec->election_timeout);
2276         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2277                                                 fast_start ?
2278                                                 timeval_current_ofs(0, 500000) :
2279                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2280                                                 ctdb_election_timeout, rec);
2281
2282         mem_ctx = talloc_new(ctdb);
2283
2284         /* someone called an election. check their election data
2285            and if we disagree and we would rather be the elected node, 
2286            send a new election message to all other nodes
2287          */
2288         if (ctdb_election_win(rec, em)) {
2289                 if (!rec->send_election_te) {
2290                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2291                                                                 timeval_current_ofs(0, 500000),
2292                                                                 election_send_request, rec);
2293                 }
2294                 talloc_free(mem_ctx);
2295                 /*unban_all_nodes(ctdb);*/
2296                 return;
2297         }
2298         
2299         /* we didn't win */
2300         talloc_free(rec->send_election_te);
2301         rec->send_election_te = NULL;
2302
2303         if (ctdb->tunable.verify_recovery_lock != 0) {
2304                 /* release the recmaster lock */
2305                 if (em->pnn != ctdb->pnn &&
2306                     ctdb->recovery_lock_fd != -1) {
2307                         close(ctdb->recovery_lock_fd);
2308                         ctdb->recovery_lock_fd = -1;
2309                         unban_all_nodes(ctdb);
2310                 }
2311         }
2312
2313         /* ok, let that guy become recmaster then */
2314         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2315         if (ret != 0) {
2316                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2317                 talloc_free(mem_ctx);
2318                 return;
2319         }
2320
2321         talloc_free(mem_ctx);
2322         return;
2323 }
2324
2325
2326 /*
2327   force the start of the election process
2328  */
2329 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2330                            struct ctdb_node_map *nodemap)
2331 {
2332         int ret;
2333         struct ctdb_context *ctdb = rec->ctdb;
2334
2335         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2336
2337         /* set all nodes to recovery mode to stop all internode traffic */
2338         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2339         if (ret != 0) {
2340                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2341                 return;
2342         }
2343
2344         talloc_free(rec->election_timeout);
2345         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2346                                                 fast_start ?
2347                                                 timeval_current_ofs(0, 500000) :
2348                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2349                                                 ctdb_election_timeout, rec);
2350
2351         ret = send_election_request(rec, pnn, true);
2352         if (ret!=0) {
2353                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2354                 return;
2355         }
2356
2357         /* wait for a few seconds to collect all responses */
2358         ctdb_wait_election(rec);
2359 }
2360
2361
2362
2363 /*
2364   handler for when a node changes its flags
2365 */
2366 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2367                             TDB_DATA data, void *private_data)
2368 {
2369         int ret;
2370         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2371         struct ctdb_node_map *nodemap=NULL;
2372         TALLOC_CTX *tmp_ctx;
2373         int i;
2374         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2375         int disabled_flag_changed;
2376
2377         if (data.dsize != sizeof(*c)) {
2378                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2379                 return;
2380         }
2381
2382         tmp_ctx = talloc_new(ctdb);
2383         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2384
2385         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2386         if (ret != 0) {
2387                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2388                 talloc_free(tmp_ctx);
2389                 return;         
2390         }
2391
2392
2393         for (i=0;i<nodemap->num;i++) {
2394                 if (nodemap->nodes[i].pnn == c->pnn) break;
2395         }
2396
2397         if (i == nodemap->num) {
2398                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2399                 talloc_free(tmp_ctx);
2400                 return;
2401         }
2402
2403         if (nodemap->nodes[i].flags != c->new_flags) {
2404                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2405         }
2406
2407         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2408
2409         nodemap->nodes[i].flags = c->new_flags;
2410
2411         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2412                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2413
2414         if (ret == 0) {
2415                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2416                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2417         }
2418         
2419         if (ret == 0 &&
2420             ctdb->recovery_master == ctdb->pnn &&
2421             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2422                 /* Only do the takeover run if the perm disabled or unhealthy
2423                    flags changed since these will cause an ip failover but not
2424                    a recovery.
2425                    If the node became disconnected or banned this will also
2426                    lead to an ip address failover but that is handled 
2427                    during recovery
2428                 */
2429                 if (disabled_flag_changed) {
2430                         rec->need_takeover_run = true;
2431                 }
2432         }
2433
2434         talloc_free(tmp_ctx);
2435 }
2436
2437 /*
2438   handler for when we need to push out flag changes ot all other nodes
2439 */
2440 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2441                             TDB_DATA data, void *private_data)
2442 {
2443         int ret;
2444         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2445         struct ctdb_node_map *nodemap=NULL;
2446         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2447         uint32_t recmaster;
2448         uint32_t *nodes;
2449
2450         /* find the recovery master */
2451         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2452         if (ret != 0) {
2453                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2454                 talloc_free(tmp_ctx);
2455                 return;
2456         }
2457
2458         /* read the node flags from the recmaster */
2459         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2460         if (ret != 0) {
2461                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2462                 talloc_free(tmp_ctx);
2463                 return;
2464         }
2465         if (c->pnn >= nodemap->num) {
2466                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2467                 talloc_free(tmp_ctx);
2468                 return;
2469         }
2470
2471         /* send the flags update to all connected nodes */
2472         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2473
2474         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2475                                       nodes, 0, CONTROL_TIMEOUT(),
2476                                       false, data,
2477                                       NULL, NULL,
2478                                       NULL) != 0) {
2479                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2480
2481                 talloc_free(tmp_ctx);
2482                 return;
2483         }
2484
2485         talloc_free(tmp_ctx);
2486 }
2487
2488
2489 struct verify_recmode_normal_data {
2490         uint32_t count;
2491         enum monitor_result status;
2492 };
2493
2494 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2495 {
2496         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2497
2498
2499         /* one more node has responded with recmode data*/
2500         rmdata->count--;
2501
2502         /* if we failed to get the recmode, then return an error and let
2503            the main loop try again.
2504         */
2505         if (state->state != CTDB_CONTROL_DONE) {
2506                 if (rmdata->status == MONITOR_OK) {
2507                         rmdata->status = MONITOR_FAILED;
2508                 }
2509                 return;
2510         }
2511
2512         /* if we got a response, then the recmode will be stored in the
2513            status field
2514         */
2515         if (state->status != CTDB_RECOVERY_NORMAL) {
2516                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2517                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2518         }
2519
2520         return;
2521 }
2522
2523
2524 /* verify that all nodes are in normal recovery mode */
2525 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2526 {
2527         struct verify_recmode_normal_data *rmdata;
2528         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2529         struct ctdb_client_control_state *state;
2530         enum monitor_result status;
2531         int j;
2532         
2533         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2534         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2535         rmdata->count  = 0;
2536         rmdata->status = MONITOR_OK;
2537
2538         /* loop over all active nodes and send an async getrecmode call to 
2539            them*/
2540         for (j=0; j<nodemap->num; j++) {
2541                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2542                         continue;
2543                 }
2544                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2545                                         CONTROL_TIMEOUT(), 
2546                                         nodemap->nodes[j].pnn);
2547                 if (state == NULL) {
2548                         /* we failed to send the control, treat this as 
2549                            an error and try again next iteration
2550                         */                      
2551                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2552                         talloc_free(mem_ctx);
2553                         return MONITOR_FAILED;
2554                 }
2555
2556                 /* set up the callback functions */
2557                 state->async.fn = verify_recmode_normal_callback;
2558                 state->async.private_data = rmdata;
2559
2560                 /* one more control to wait for to complete */
2561                 rmdata->count++;
2562         }
2563
2564
2565         /* now wait for up to the maximum number of seconds allowed
2566            or until all nodes we expect a response from has replied
2567         */
2568         while (rmdata->count > 0) {
2569                 event_loop_once(ctdb->ev);
2570         }
2571
2572         status = rmdata->status;
2573         talloc_free(mem_ctx);
2574         return status;
2575 }
2576
2577
2578 struct verify_recmaster_data {
2579         struct ctdb_recoverd *rec;
2580         uint32_t count;
2581         uint32_t pnn;
2582         enum monitor_result status;
2583 };
2584
2585 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2586 {
2587         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2588
2589
2590         /* one more node has responded with recmaster data*/
2591         rmdata->count--;
2592
2593         /* if we failed to get the recmaster, then return an error and let
2594            the main loop try again.
2595         */
2596         if (state->state != CTDB_CONTROL_DONE) {
2597                 if (rmdata->status == MONITOR_OK) {
2598                         rmdata->status = MONITOR_FAILED;
2599                 }
2600                 return;
2601         }
2602
2603         /* if we got a response, then the recmaster will be stored in the
2604            status field
2605         */
2606         if (state->status != rmdata->pnn) {
2607                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2608                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2609                 rmdata->status = MONITOR_ELECTION_NEEDED;
2610         }
2611
2612         return;
2613 }
2614
2615
2616 /* verify that all nodes agree that we are the recmaster */
2617 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2618 {
2619         struct ctdb_context *ctdb = rec->ctdb;
2620         struct verify_recmaster_data *rmdata;
2621         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2622         struct ctdb_client_control_state *state;
2623         enum monitor_result status;
2624         int j;
2625         
2626         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2627         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2628         rmdata->rec    = rec;
2629         rmdata->count  = 0;
2630         rmdata->pnn    = pnn;
2631         rmdata->status = MONITOR_OK;
2632
2633         /* loop over all active nodes and send an async getrecmaster call to 
2634            them*/
2635         for (j=0; j<nodemap->num; j++) {
2636                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2637                         continue;
2638                 }
2639                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2640                                         CONTROL_TIMEOUT(),
2641                                         nodemap->nodes[j].pnn);
2642                 if (state == NULL) {
2643                         /* we failed to send the control, treat this as 
2644                            an error and try again next iteration
2645                         */                      
2646                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2647                         talloc_free(mem_ctx);
2648                         return MONITOR_FAILED;
2649                 }
2650
2651                 /* set up the callback functions */
2652                 state->async.fn = verify_recmaster_callback;
2653                 state->async.private_data = rmdata;
2654
2655                 /* one more control to wait for to complete */
2656                 rmdata->count++;
2657         }
2658
2659
2660         /* now wait for up to the maximum number of seconds allowed
2661            or until all nodes we expect a response from has replied
2662         */
2663         while (rmdata->count > 0) {
2664                 event_loop_once(ctdb->ev);
2665         }
2666
2667         status = rmdata->status;
2668         talloc_free(mem_ctx);
2669         return status;
2670 }
2671
2672
2673 /* called to check that the local allocation of public ip addresses is ok.
2674 */
2675 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2676 {
2677         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2678         struct ctdb_control_get_ifaces *ifaces = NULL;
2679         struct ctdb_all_public_ips *ips = NULL;
2680         struct ctdb_uptime *uptime1 = NULL;
2681         struct ctdb_uptime *uptime2 = NULL;
2682         int ret, j;
2683         bool need_iface_check = false;
2684         bool need_takeover_run = false;
2685
2686         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2687                                 CTDB_CURRENT_NODE, &uptime1);
2688         if (ret != 0) {
2689                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2690                 talloc_free(mem_ctx);
2691                 return -1;
2692         }
2693
2694
2695         /* read the interfaces from the local node */
2696         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2697         if (ret != 0) {
2698                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2699                 talloc_free(mem_ctx);
2700                 return -1;
2701         }
2702
2703         if (!rec->ifaces) {
2704                 need_iface_check = true;
2705         } else if (rec->ifaces->num != ifaces->num) {
2706                 need_iface_check = true;
2707         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2708                 need_iface_check = true;
2709         }
2710
2711         if (need_iface_check) {
2712                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2713                                      "local node %u - force takeover run\n",
2714                                      pnn));
2715                 need_takeover_run = true;
2716         }
2717
2718         /* read the ip allocation from the local node */
2719         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2720         if (ret != 0) {
2721                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2722                 talloc_free(mem_ctx);
2723                 return -1;
2724         }
2725
2726         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2727                                 CTDB_CURRENT_NODE, &uptime2);
2728         if (ret != 0) {
2729                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2730                 talloc_free(mem_ctx);
2731                 return -1;
2732         }
2733
2734         /* skip the check if the startrecovery time has changed */
2735         if (timeval_compare(&uptime1->last_recovery_started,
2736                             &uptime2->last_recovery_started) != 0) {
2737                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2738                 talloc_free(mem_ctx);
2739                 return 0;
2740         }
2741
2742         /* skip the check if the endrecovery time has changed */
2743         if (timeval_compare(&uptime1->last_recovery_finished,
2744                             &uptime2->last_recovery_finished) != 0) {
2745                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2746                 talloc_free(mem_ctx);
2747                 return 0;
2748         }
2749
2750         /* skip the check if we have started but not finished recovery */
2751         if (timeval_compare(&uptime1->last_recovery_finished,
2752                             &uptime1->last_recovery_started) != 1) {
2753                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2754                 talloc_free(mem_ctx);
2755
2756                 return 0;
2757         }
2758
2759         talloc_free(rec->ifaces);
2760         rec->ifaces = talloc_steal(rec, ifaces);
2761
2762         /* verify that we have the ip addresses we should have
2763            and we dont have ones we shouldnt have.
2764            if we find an inconsistency we set recmode to
2765            active on the local node and wait for the recmaster
2766            to do a full blown recovery.
2767            also if the pnn is -1 and we are healthy and can host the ip
2768            we also request a ip reallocation.
2769         */
2770         if (ctdb->tunable.disable_ip_failover == 0) {
2771                 for (j=0; j<ips->num; j++) {
2772                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2773                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2774                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2775                                 need_takeover_run = true;
2776                         } else if (ips->ips[j].pnn == pnn) {
2777                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2778                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2779                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2780                                         need_takeover_run = true;
2781                                 }
2782                         } else {
2783                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2784                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2785                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2786                                         need_takeover_run = true;
2787                                 }
2788                         }
2789                 }
2790         }
2791
2792         if (need_takeover_run) {
2793                 struct takeover_run_reply rd;
2794                 TDB_DATA data;
2795
2796                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2797
2798                 rd.pnn = ctdb->pnn;
2799                 rd.srvid = 0;
2800                 data.dptr = (uint8_t *)&rd;
2801                 data.dsize = sizeof(rd);
2802
2803                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2804                 if (ret != 0) {
2805                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2806                 }
2807         }
2808         talloc_free(mem_ctx);
2809         return 0;
2810 }
2811
2812
2813 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2814 {
2815         struct ctdb_node_map **remote_nodemaps = callback_data;
2816
2817         if (node_pnn >= ctdb->num_nodes) {
2818                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2819                 return;
2820         }
2821
2822         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2823
2824 }
2825
2826 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2827         struct ctdb_node_map *nodemap,
2828         struct ctdb_node_map **remote_nodemaps)
2829 {
2830         uint32_t *nodes;
2831
2832         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2833         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2834                                         nodes, 0,
2835                                         CONTROL_TIMEOUT(), false, tdb_null,
2836                                         async_getnodemap_callback,
2837                                         NULL,
2838                                         remote_nodemaps) != 0) {
2839                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2840
2841                 return -1;
2842         }
2843
2844         return 0;
2845 }
2846
2847 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2848 struct ctdb_check_reclock_state {
2849         struct ctdb_context *ctdb;
2850         struct timeval start_time;
2851         int fd[2];
2852         pid_t child;
2853         struct timed_event *te;
2854         struct fd_event *fde;
2855         enum reclock_child_status status;
2856 };
2857
2858 /* when we free the reclock state we must kill any child process.
2859 */
2860 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2861 {
2862         struct ctdb_context *ctdb = state->ctdb;
2863
2864         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2865
2866         if (state->fd[0] != -1) {
2867                 close(state->fd[0]);
2868                 state->fd[0] = -1;
2869         }
2870         if (state->fd[1] != -1) {
2871                 close(state->fd[1]);
2872                 state->fd[1] = -1;
2873         }
2874         kill(state->child, SIGKILL);
2875         return 0;
2876 }
2877
2878 /*
2879   called if our check_reclock child times out. this would happen if
2880   i/o to the reclock file blocks.
2881  */
2882 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2883                                          struct timeval t, void *private_data)
2884 {
2885         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2886                                            struct ctdb_check_reclock_state);
2887
2888         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2889         state->status = RECLOCK_TIMEOUT;
2890 }
2891
2892 /* this is called when the child process has completed checking the reclock
2893    file and has written data back to us through the pipe.
2894 */
2895 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2896                              uint16_t flags, void *private_data)
2897 {
2898         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2899                                              struct ctdb_check_reclock_state);
2900         char c = 0;
2901         int ret;
2902
2903         /* we got a response from our child process so we can abort the
2904            timeout.
2905         */
2906         talloc_free(state->te);
2907         state->te = NULL;
2908
2909         ret = read(state->fd[0], &c, 1);
2910         if (ret != 1 || c != RECLOCK_OK) {
2911                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2912                 state->status = RECLOCK_FAILED;
2913
2914                 return;
2915         }
2916
2917         state->status = RECLOCK_OK;
2918         return;
2919 }
2920
2921 static int check_recovery_lock(struct ctdb_context *ctdb)
2922 {
2923         int ret;
2924         struct ctdb_check_reclock_state *state;
2925         pid_t parent = getpid();
2926
2927         if (ctdb->recovery_lock_fd == -1) {
2928                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2929                 return -1;
2930         }
2931
2932         state = talloc(ctdb, struct ctdb_check_reclock_state);
2933         CTDB_NO_MEMORY(ctdb, state);
2934
2935         state->ctdb = ctdb;
2936         state->start_time = timeval_current();
2937         state->status = RECLOCK_CHECKING;
2938         state->fd[0] = -1;
2939         state->fd[1] = -1;
2940
2941         ret = pipe(state->fd);
2942         if (ret != 0) {
2943                 talloc_free(state);
2944                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2945                 return -1;
2946         }
2947
2948         state->child = ctdb_fork(ctdb);
2949         if (state->child == (pid_t)-1) {
2950                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2951                 close(state->fd[0]);
2952                 state->fd[0] = -1;
2953                 close(state->fd[1]);
2954                 state->fd[1] = -1;
2955                 talloc_free(state);
2956                 return -1;
2957         }
2958
2959         if (state->child == 0) {
2960                 char cc = RECLOCK_OK;
2961                 close(state->fd[0]);
2962                 state->fd[0] = -1;
2963
2964                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2965                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2966                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2967                         cc = RECLOCK_FAILED;
2968                 }
2969
2970                 write(state->fd[1], &cc, 1);
2971                 /* make sure we die when our parent dies */
2972                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2973                         sleep(5);
2974                         write(state->fd[1], &cc, 1);
2975                 }
2976                 _exit(0);
2977         }
2978         close(state->fd[1]);
2979         state->fd[1] = -1;
2980         set_close_on_exec(state->fd[0]);
2981
2982         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2983
2984         talloc_set_destructor(state, check_reclock_destructor);
2985
2986         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2987                                     ctdb_check_reclock_timeout, state);
2988         if (state->te == NULL) {
2989                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2990                 talloc_free(state);
2991                 return -1;
2992         }
2993
2994         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2995                                 EVENT_FD_READ,
2996                                 reclock_child_handler,
2997                                 (void *)state);
2998
2999         if (state->fde == NULL) {
3000                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3001                 talloc_free(state);
3002                 return -1;
3003         }
3004         tevent_fd_set_auto_close(state->fde);
3005
3006         while (state->status == RECLOCK_CHECKING) {
3007                 event_loop_once(ctdb->ev);
3008         }
3009
3010         if (state->status == RECLOCK_FAILED) {
3011                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3012                 close(ctdb->recovery_lock_fd);
3013                 ctdb->recovery_lock_fd = -1;
3014                 talloc_free(state);
3015                 return -1;
3016         }
3017
3018         talloc_free(state);
3019         return 0;
3020 }
3021
3022 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3023 {
3024         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3025         const char *reclockfile;
3026
3027         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3028                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3029                 talloc_free(tmp_ctx);
3030                 return -1;      
3031         }
3032
3033         if (reclockfile == NULL) {
3034                 if (ctdb->recovery_lock_file != NULL) {
3035                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3036                         talloc_free(ctdb->recovery_lock_file);
3037                         ctdb->recovery_lock_file = NULL;
3038                         if (ctdb->recovery_lock_fd != -1) {
3039                                 close(ctdb->recovery_lock_fd);
3040                                 ctdb->recovery_lock_fd = -1;
3041                         }
3042                 }
3043                 ctdb->tunable.verify_recovery_lock = 0;
3044                 talloc_free(tmp_ctx);
3045                 return 0;
3046         }
3047
3048         if (ctdb->recovery_lock_file == NULL) {
3049                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3050                 if (ctdb->recovery_lock_fd != -1) {
3051                         close(ctdb->recovery_lock_fd);
3052                         ctdb->recovery_lock_fd = -1;
3053                 }
3054                 talloc_free(tmp_ctx);
3055                 return 0;
3056         }
3057
3058
3059         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3060                 talloc_free(tmp_ctx);
3061                 return 0;
3062         }
3063
3064         talloc_free(ctdb->recovery_lock_file);
3065         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3066         ctdb->tunable.verify_recovery_lock = 0;
3067         if (ctdb->recovery_lock_fd != -1) {
3068                 close(ctdb->recovery_lock_fd);
3069                 ctdb->recovery_lock_fd = -1;
3070         }
3071
3072         talloc_free(tmp_ctx);
3073         return 0;
3074 }
3075
3076 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3077                       TALLOC_CTX *mem_ctx)
3078 {
3079         uint32_t pnn;
3080         struct ctdb_node_map *nodemap=NULL;
3081         struct ctdb_node_map *recmaster_nodemap=NULL;
3082         struct ctdb_node_map **remote_nodemaps=NULL;
3083         struct ctdb_vnn_map *vnnmap=NULL;
3084         struct ctdb_vnn_map *remote_vnnmap=NULL;
3085         int32_t debug_level;
3086         int i, j, ret;
3087
3088
3089
3090         /* verify that the main daemon is still running */
3091         if (kill(ctdb->ctdbd_pid, 0) != 0) {
3092                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3093                 exit(-1);
3094         }
3095
3096         /* ping the local daemon to tell it we are alive */
3097         ctdb_ctrl_recd_ping(ctdb);
3098
3099         if (rec->election_timeout) {
3100                 /* an election is in progress */
3101                 return;
3102         }
3103
3104         /* read the debug level from the parent and update locally */
3105         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3106         if (ret !=0) {
3107                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3108                 return;
3109         }
3110         LogLevel = debug_level;
3111
3112
3113         /* We must check if we need to ban a node here but we want to do this
3114            as early as possible so we dont wait until we have pulled the node
3115            map from the local node. thats why we have the hardcoded value 20
3116         */
3117         for (i=0; i<ctdb->num_nodes; i++) {
3118                 struct ctdb_banning_state *ban_state;
3119
3120                 if (ctdb->nodes[i]->ban_state == NULL) {
3121                         continue;
3122                 }
3123                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3124                 if (ban_state->count < 20) {
3125                         continue;
3126                 }
3127                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3128                         ctdb->nodes[i]->pnn, ban_state->count,
3129                         ctdb->tunable.recovery_ban_period));
3130                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3131                 ban_state->count = 0;
3132         }
3133
3134         /* get relevant tunables */
3135         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3136         if (ret != 0) {
3137                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3138                 return;
3139         }
3140
3141         /* get the current recovery lock file from the server */
3142         if (update_recovery_lock_file(ctdb) != 0) {
3143                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3144                 return;
3145         }
3146
3147         /* Make sure that if recovery lock verification becomes disabled when
3148            we close the file
3149         */
3150         if (ctdb->tunable.verify_recovery_lock == 0) {
3151                 if (ctdb->recovery_lock_fd != -1) {
3152                         close(ctdb->recovery_lock_fd);
3153                         ctdb->recovery_lock_fd = -1;
3154                 }
3155         }
3156
3157         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3158         if (pnn == (uint32_t)-1) {
3159                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3160                 return;
3161         }
3162
3163         /* get the vnnmap */
3164         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3165         if (ret != 0) {
3166                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3167                 return;
3168         }
3169
3170
3171         /* get number of nodes */
3172         if (rec->nodemap) {
3173                 talloc_free(rec->nodemap);
3174                 rec->nodemap = NULL;
3175                 nodemap=NULL;
3176         }
3177         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3178         if (ret != 0) {
3179                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3180                 return;
3181         }
3182         nodemap = rec->nodemap;
3183
3184         /* update the capabilities for all nodes */
3185         ret = update_capabilities(ctdb, nodemap);
3186         if (ret != 0) {
3187                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3188                 return;
3189         }
3190
3191         /* check which node is the recovery master */
3192         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3193         if (ret != 0) {
3194                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3195                 return;
3196         }
3197
3198         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3199         if (rec->recmaster != pnn) {
3200                 if (rec->ip_reallocate_ctx != NULL) {
3201                         talloc_free(rec->ip_reallocate_ctx);
3202                         rec->ip_reallocate_ctx = NULL;
3203                         rec->reallocate_callers = NULL;
3204                 }
3205         }
3206
3207         if (rec->recmaster == (uint32_t)-1) {
3208                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3209                 force_election(rec, pnn, nodemap);
3210                 return;
3211         }
3212
3213         /* if the local daemon is STOPPED, we verify that the databases are
3214            also frozen and thet the recmode is set to active 
3215         */
3216         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3217                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3218                 if (ret != 0) {
3219                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3220                 }
3221                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3222                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3223
3224                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3225                         if (ret != 0) {
3226                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3227                                 return;
3228                         }
3229                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3230                         if (ret != 0) {
3231                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3232
3233                                 return;
3234                         }
3235                         return;
3236                 }
3237         }
3238         /* If the local node is stopped, verify we are not the recmaster 
3239            and yield this role if so
3240         */
3241         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3242                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3243                 force_election(rec, pnn, nodemap);
3244                 return;
3245         }
3246         
3247         /*
3248          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3249          * but we have force an election and try to become the new
3250          * recmaster
3251          */
3252         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3253             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3254              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3255                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3256                                   " but we (node %u) have - force an election\n",
3257                                   rec->recmaster, pnn));
3258                 force_election(rec, pnn, nodemap);
3259                 return;
3260         }
3261
3262         /* check that we (recovery daemon) and the local ctdb daemon
3263            agrees on whether we are banned or not
3264         */
3265 //qqq
3266
3267         /* remember our own node flags */
3268         rec->node_flags = nodemap->nodes[pnn].flags;
3269
3270         /* count how many active nodes there are */
3271         rec->num_active    = 0;
3272         rec->num_connected = 0;
3273         for (i=0; i<nodemap->num; i++) {
3274                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3275                         rec->num_active++;
3276                 }
3277                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3278                         rec->num_connected++;
3279                 }
3280         }
3281
3282
3283         /* verify that the recmaster node is still active */
3284         for (j=0; j<nodemap->num; j++) {
3285                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3286                         break;
3287                 }
3288         }
3289
3290         if (j == nodemap->num) {
3291                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3292                 force_election(rec, pnn, nodemap);
3293                 return;
3294         }
3295
3296         /* if recovery master is disconnected we must elect a new recmaster */
3297         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3298                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3299                 force_election(rec, pnn, nodemap);
3300                 return;
3301         }
3302
3303         /* grap the nodemap from the recovery master to check if it is banned */
3304         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3305                                    mem_ctx, &recmaster_nodemap);
3306         if (ret != 0) {
3307                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3308                           nodemap->nodes[j].pnn));
3309                 return;
3310         }
3311
3312
3313         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3314                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3315                 force_election(rec, pnn, nodemap);
3316                 return;
3317         }
3318
3319
3320         /* verify that we have all ip addresses we should have and we dont
3321          * have addresses we shouldnt have.
3322          */ 
3323         if (ctdb->tunable.disable_ip_failover == 0) {
3324                 if (rec->ip_check_disable_ctx == NULL) {
3325                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3326                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3327                         }
3328                 }
3329         }
3330
3331
3332         /* if we are not the recmaster then we do not need to check
3333            if recovery is needed
3334          */
3335         if (pnn != rec->recmaster) {
3336                 return;
3337         }
3338
3339
3340         /* ensure our local copies of flags are right */
3341         ret = update_local_flags(rec, nodemap);
3342         if (ret == MONITOR_ELECTION_NEEDED) {
3343                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3344                 force_election(rec, pnn, nodemap);
3345                 return;
3346         }
3347         if (ret != MONITOR_OK) {
3348                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3349                 return;
3350         }
3351
3352         if (ctdb->num_nodes != nodemap->num) {
3353                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3354                 reload_nodes_file(ctdb);
3355                 return;
3356         }
3357
3358         /* verify that all active nodes agree that we are the recmaster */
3359         switch (verify_recmaster(rec, nodemap, pnn)) {
3360         case MONITOR_RECOVERY_NEEDED:
3361                 /* can not happen */
3362                 return;
3363         case MONITOR_ELECTION_NEEDED:
3364                 force_election(rec, pnn, nodemap);
3365                 return;
3366         case MONITOR_OK:
3367                 break;
3368         case MONITOR_FAILED:
3369                 return;
3370         }
3371
3372
3373         if (rec->need_recovery) {
3374                 /* a previous recovery didn't finish */
3375                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3376                 return;
3377         }
3378
3379         /* verify that all active nodes are in normal mode 
3380            and not in recovery mode 
3381         */
3382         switch (verify_recmode(ctdb, nodemap)) {
3383         case MONITOR_RECOVERY_NEEDED:
3384                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3385                 return;
3386         case MONITOR_FAILED:
3387                 return;
3388         case MONITOR_ELECTION_NEEDED:
3389                 /* can not happen */
3390         case MONITOR_OK:
3391                 break;
3392         }
3393
3394
3395         if (ctdb->tunable.verify_recovery_lock != 0) {
3396                 /* we should have the reclock - check its not stale */
3397                 ret = check_recovery_lock(ctdb);
3398                 if (ret != 0) {
3399                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3400                         ctdb_set_culprit(rec, ctdb->pnn);
3401                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3402                         return;
3403                 }
3404         }
3405
3406         /* if there are takeovers requested, perform it and notify the waiters */
3407         if (rec->reallocate_callers) {
3408                 process_ipreallocate_requests(ctdb, rec);
3409         }
3410
3411         /* get the nodemap for all active remote nodes
3412          */
3413         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3414         if (remote_nodemaps == NULL) {
3415                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3416                 return;
3417         }
3418         for(i=0; i<nodemap->num; i++) {
3419                 remote_nodemaps[i] = NULL;
3420         }
3421         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3422                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3423                 return;
3424         } 
3425
3426         /* verify that all other nodes have the same nodemap as we have
3427         */
3428         for (j=0; j<nodemap->num; j++) {
3429                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3430                         continue;
3431                 }
3432
3433                 if (remote_nodemaps[j] == NULL) {
3434                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3435                         ctdb_set_culprit(rec, j);
3436
3437                         return;
3438                 }
3439
3440                 /* if the nodes disagree on how many nodes there are
3441                    then this is a good reason to try recovery
3442                  */
3443                 if (remote_nodemaps[j]->num != nodemap->num) {
3444                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3445                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3446                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3447                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3448                         return;
3449                 }
3450
3451                 /* if the nodes disagree on which nodes exist and are
3452                    active, then that is also a good reason to do recovery
3453                  */
3454                 for (i=0;i<nodemap->num;i++) {
3455                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3456                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3457                                           nodemap->nodes[j].pnn, i, 
3458                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3459                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3460                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3461                                             vnnmap);
3462                                 return;
3463                         }
3464                 }
3465
3466                 /* verify the flags are consistent
3467                 */
3468                 for (i=0; i<nodemap->num; i++) {
3469                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3470                                 continue;
3471                         }
3472                         
3473                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3474                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3475                                   nodemap->nodes[j].pnn, 
3476                                   nodemap->nodes[i].pnn, 
3477                                   remote_nodemaps[j]->nodes[i].flags,
3478                                   nodemap->nodes[j].flags));
3479                                 if (i == j) {
3480                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3481                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3482                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3483                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3484                                                     vnnmap);
3485                                         return;
3486                                 } else {
3487                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3488                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3489                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3490                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3491                                                     vnnmap);
3492                                         return;
3493                                 }
3494                         }
3495                 }
3496         }
3497
3498
3499         /* there better be the same number of lmasters in the vnn map
3500            as there are active nodes or we will have to do a recovery
3501          */
3502         if (vnnmap->size != rec->num_active) {
3503                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3504                           vnnmap->size, rec->num_active));
3505                 ctdb_set_culprit(rec, ctdb->pnn);
3506                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3507                 return;
3508         }
3509
3510         /* verify that all active nodes in the nodemap also exist in 
3511            the vnnmap.
3512          */
3513         for (j=0; j<nodemap->num; j++) {
3514                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3515                         continue;
3516                 }
3517                 if (nodemap->nodes[j].pnn == pnn) {
3518                         continue;
3519                 }
3520
3521                 for (i=0; i<vnnmap->size; i++) {
3522                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3523                                 break;
3524                         }
3525                 }
3526                 if (i == vnnmap->size) {
3527                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3528                                   nodemap->nodes[j].pnn));
3529                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3530                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3531                         return;
3532                 }
3533         }
3534
3535         
3536         /* verify that all other nodes have the same vnnmap
3537            and are from the same generation
3538          */
3539         for (j=0; j<nodemap->num; j++) {
3540                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3541                         continue;
3542                 }
3543                 if (nodemap->nodes[j].pnn == pnn) {
3544                         continue;
3545                 }
3546
3547                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3548                                           mem_ctx, &remote_vnnmap);
3549                 if (ret != 0) {
3550                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3551                                   nodemap->nodes[j].pnn));
3552                         return;
3553                 }
3554
3555                 /* verify the vnnmap generation is the same */
3556                 if (vnnmap->generation != remote_vnnmap->generation) {
3557                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3558                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3559                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3560                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3561                         return;
3562                 }
3563
3564                 /* verify the vnnmap size is the same */
3565                 if (vnnmap->size != remote_vnnmap->size) {
3566                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3567                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3568                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3569                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3570                         return;
3571                 }
3572
3573                 /* verify the vnnmap is the same */
3574                 for (i=0;i<vnnmap->size;i++) {
3575                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3576                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3577                                           nodemap->nodes[j].pnn));
3578                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3579                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3580                                             vnnmap);
3581                                 return;
3582                         }
3583                 }
3584         }
3585
3586         /* we might need to change who has what IP assigned */
3587         if (rec->need_takeover_run) {
3588                 uint32_t culprit = (uint32_t)-1;
3589
3590                 rec->need_takeover_run = false;
3591
3592                 /* update the list of public ips that a node can handle for
3593                    all connected nodes
3594                 */
3595                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3596                 if (ret != 0) {
3597                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3598                                          culprit));
3599                         rec->need_takeover_run = true;
3600                         return;
3601                 }
3602
3603                 /* execute the "startrecovery" event script on all nodes */
3604                 ret = run_startrecovery_eventscript(rec, nodemap);
3605                 if (ret!=0) {
3606                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3607                         ctdb_set_culprit(rec, ctdb->pnn);
3608                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3609                         return;
3610                 }
3611
3612                 ret = ctdb_takeover_run(ctdb, nodemap);
3613                 if (ret != 0) {
3614                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3615                         return;
3616                 }
3617
3618                 /* execute the "recovered" event script on all nodes */
3619                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3620 #if 0
3621 // we cant check whether the event completed successfully
3622 // since this script WILL fail if the node is in recovery mode
3623 // and if that race happens, the code here would just cause a second
3624 // cascading recovery.
3625                 if (ret!=0) {
3626                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3627                         ctdb_set_culprit(rec, ctdb->pnn);
3628                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3629                 }
3630 #endif
3631         }
3632 }
3633
3634 /*
3635   the main monitoring loop
3636  */
3637 static void monitor_cluster(struct ctdb_context *ctdb)
3638 {
3639         struct ctdb_recoverd *rec;
3640
3641         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3642
3643         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3644         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3645
3646         rec->ctdb = ctdb;
3647
3648         rec->priority_time = timeval_current();
3649
3650         /* register a message port for sending memory dumps */
3651         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3652
3653         /* register a message port for recovery elections */
3654         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3655
3656         /* when nodes are disabled/enabled */
3657         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3658
3659         /* when we are asked to puch out a flag change */
3660         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3661
3662         /* register a message port for vacuum fetch */
3663         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3664
3665         /* register a message port for reloadnodes  */
3666         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3667
3668         /* register a message port for performing a takeover run */
3669         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3670
3671         /* register a message port for disabling the ip check for a short while */
3672         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3673
3674         /* register a message port for updating the recovery daemons node assignment for an ip */
3675         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3676
3677         /* register a message port for forcing a rebalance of a node next
3678            reallocation */
3679         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3680
3681         for (;;) {
3682                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3683                 struct timeval start;
3684                 double elapsed;
3685
3686                 if (!mem_ctx) {
3687                         DEBUG(DEBUG_CRIT,(__location__
3688                                           " Failed to create temp context\n"));
3689                         exit(-1);
3690                 }
3691
3692                 start = timeval_current();
3693                 main_loop(ctdb, rec, mem_ctx);
3694                 talloc_free(mem_ctx);
3695
3696                 /* we only check for recovery once every second */
3697                 elapsed = timeval_elapsed(&start);
3698                 if (elapsed < ctdb->tunable.recover_interval) {
3699                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3700                                           - elapsed);
3701                 }
3702         }
3703 }
3704
3705 /*
3706   event handler for when the main ctdbd dies
3707  */
3708 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3709                                  uint16_t flags, void *private_data)
3710 {
3711         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3712         _exit(1);
3713 }
3714
3715 /*
3716   called regularly to verify that the recovery daemon is still running
3717  */
3718 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3719                               struct timeval yt, void *p)
3720 {
3721         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3722
3723         if (kill(ctdb->recoverd_pid, 0) != 0) {
3724                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3725
3726                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3727                                 ctdb_restart_recd, ctdb);
3728
3729                 return;
3730         }
3731
3732         event_add_timed(ctdb->ev, ctdb, 
3733                         timeval_current_ofs(30, 0),
3734                         ctdb_check_recd, ctdb);
3735 }
3736
3737 static void recd_sig_child_handler(struct event_context *ev,
3738         struct signal_event *se, int signum, int count,
3739         void *dont_care, 
3740         void *private_data)
3741 {
3742 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3743         int status;
3744         pid_t pid = -1;
3745
3746         while (pid != 0) {
3747                 pid = waitpid(-1, &status, WNOHANG);
3748                 if (pid == -1) {
3749                         if (errno != ECHILD) {
3750                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3751                         }
3752                         return;
3753                 }
3754                 if (pid > 0) {
3755                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3756                 }
3757         }
3758 }
3759
3760 /*
3761   startup the recovery daemon as a child of the main ctdb daemon
3762  */
3763 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3764 {
3765         int fd[2];
3766         struct signal_event *se;
3767         struct tevent_fd *fde;
3768
3769         if (pipe(fd) != 0) {
3770                 return -1;
3771         }
3772
3773         ctdb->ctdbd_pid = getpid();
3774
3775         ctdb->recoverd_pid = fork();
3776         if (ctdb->recoverd_pid == -1) {
3777                 return -1;
3778         }
3779         
3780         if (ctdb->recoverd_pid != 0) {
3781                 close(fd[0]);
3782                 event_add_timed(ctdb->ev, ctdb, 
3783                                 timeval_current_ofs(30, 0),
3784                                 ctdb_check_recd, ctdb);
3785                 return 0;
3786         }
3787
3788         close(fd[1]);
3789
3790         srandom(getpid() ^ time(NULL));
3791
3792         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3793                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3794                 exit(1);
3795         }
3796
3797         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3798
3799         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3800                      ctdb_recoverd_parent, &fd[0]);     
3801         tevent_fd_set_auto_close(fde);
3802
3803         /* set up a handler to pick up sigchld */
3804         se = event_add_signal(ctdb->ev, ctdb,
3805                                      SIGCHLD, 0,
3806                                      recd_sig_child_handler,
3807                                      ctdb);
3808         if (se == NULL) {
3809                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3810                 exit(1);
3811         }
3812
3813         monitor_cluster(ctdb);
3814
3815         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3816         return -1;
3817 }
3818
3819 /*
3820   shutdown the recovery daemon
3821  */
3822 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3823 {
3824         if (ctdb->recoverd_pid == 0) {
3825                 return;
3826         }
3827
3828         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3829         kill(ctdb->recoverd_pid, SIGTERM);
3830 }
3831
3832 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3833                        struct timeval t, void *private_data)
3834 {
3835         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3836
3837         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3838         ctdb_stop_recoverd(ctdb);
3839         ctdb_start_recoverd(ctdb);
3840 }