Merge commit 'origin/master'
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes, 0,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes, 0,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, 0,
236                                         CONTROL_TIMEOUT(),
237                                         false, tdb_null,
238                                         async_getcap_callback, NULL,
239                                         NULL) != 0) {
240                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
241                 talloc_free(tmp_ctx);
242                 return -1;
243         }
244
245         talloc_free(tmp_ctx);
246         return 0;
247 }
248
249 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
250 {
251         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
252
253         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
254         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
255 }
256
257 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
258 {
259         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
260
261         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
262         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
263 }
264
265 /*
266   change recovery mode on all nodes
267  */
268 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
269 {
270         TDB_DATA data;
271         uint32_t *nodes;
272         TALLOC_CTX *tmp_ctx;
273
274         tmp_ctx = talloc_new(ctdb);
275         CTDB_NO_MEMORY(ctdb, tmp_ctx);
276
277         /* freeze all nodes */
278         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
279         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
280                 int i;
281
282                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
283                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
284                                                 nodes, i,
285                                                 CONTROL_TIMEOUT(),
286                                                 false, tdb_null,
287                                                 NULL,
288                                                 set_recmode_fail_callback,
289                                                 rec) != 0) {
290                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
291                                 talloc_free(tmp_ctx);
292                                 return -1;
293                         }
294                 }
295         }
296
297
298         data.dsize = sizeof(uint32_t);
299         data.dptr = (unsigned char *)&rec_mode;
300
301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
302                                         nodes, 0,
303                                         CONTROL_TIMEOUT(),
304                                         false, data,
305                                         NULL, NULL,
306                                         NULL) != 0) {
307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
308                 talloc_free(tmp_ctx);
309                 return -1;
310         }
311
312         talloc_free(tmp_ctx);
313         return 0;
314 }
315
316 /*
317   change recovery master on all node
318  */
319 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
320 {
321         TDB_DATA data;
322         TALLOC_CTX *tmp_ctx;
323         uint32_t *nodes;
324
325         tmp_ctx = talloc_new(ctdb);
326         CTDB_NO_MEMORY(ctdb, tmp_ctx);
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&pnn;
330
331         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(), false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /* update all remote nodes to use the same db priority that we have
347    this can fail if the remove node has not yet been upgraded to 
348    support this function, so we always return success and never fail
349    a recovery if this call fails.
350 */
351 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
352         struct ctdb_node_map *nodemap, 
353         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
354 {
355         int db;
356         uint32_t *nodes;
357
358         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
359
360         /* step through all local databases */
361         for (db=0; db<dbmap->num;db++) {
362                 TDB_DATA data;
363                 struct ctdb_db_priority db_prio;
364                 int ret;
365
366                 db_prio.db_id     = dbmap->dbs[db].dbid;
367                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
370                         continue;
371                 }
372
373                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
374
375                 data.dptr  = (uint8_t *)&db_prio;
376                 data.dsize = sizeof(db_prio);
377
378                 if (ctdb_client_async_control(ctdb,
379                                         CTDB_CONTROL_SET_DB_PRIORITY,
380                                         nodes, 0,
381                                         CONTROL_TIMEOUT(), false, data,
382                                         NULL, NULL,
383                                         NULL) != 0) {
384                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
385                 }
386         }
387
388         return 0;
389 }                       
390
391 /*
392   ensure all other nodes have attached to any databases that we have
393  */
394 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
395                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
396 {
397         int i, j, db, ret;
398         struct ctdb_dbid_map *remote_dbmap;
399
400         /* verify that all other nodes have all our databases */
401         for (j=0; j<nodemap->num; j++) {
402                 /* we dont need to ourself ourselves */
403                 if (nodemap->nodes[j].pnn == pnn) {
404                         continue;
405                 }
406                 /* dont check nodes that are unavailable */
407                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
408                         continue;
409                 }
410
411                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
412                                          mem_ctx, &remote_dbmap);
413                 if (ret != 0) {
414                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
415                         return -1;
416                 }
417
418                 /* step through all local databases */
419                 for (db=0; db<dbmap->num;db++) {
420                         const char *name;
421
422
423                         for (i=0;i<remote_dbmap->num;i++) {
424                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
425                                         break;
426                                 }
427                         }
428                         /* the remote node already have this database */
429                         if (i!=remote_dbmap->num) {
430                                 continue;
431                         }
432                         /* ok so we need to create this database */
433                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
434                                             mem_ctx, &name);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
437                                 return -1;
438                         }
439                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
440                                            mem_ctx, name, dbmap->dbs[db].persistent);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
443                                 return -1;
444                         }
445                 }
446         }
447
448         return 0;
449 }
450
451
452 /*
453   ensure we are attached to any databases that anyone else is attached to
454  */
455 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
456                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
457 {
458         int i, j, db, ret;
459         struct ctdb_dbid_map *remote_dbmap;
460
461         /* verify that we have all database any other node has */
462         for (j=0; j<nodemap->num; j++) {
463                 /* we dont need to ourself ourselves */
464                 if (nodemap->nodes[j].pnn == pnn) {
465                         continue;
466                 }
467                 /* dont check nodes that are unavailable */
468                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
469                         continue;
470                 }
471
472                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
473                                          mem_ctx, &remote_dbmap);
474                 if (ret != 0) {
475                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
476                         return -1;
477                 }
478
479                 /* step through all databases on the remote node */
480                 for (db=0; db<remote_dbmap->num;db++) {
481                         const char *name;
482
483                         for (i=0;i<(*dbmap)->num;i++) {
484                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
485                                         break;
486                                 }
487                         }
488                         /* we already have this db locally */
489                         if (i!=(*dbmap)->num) {
490                                 continue;
491                         }
492                         /* ok so we need to create this database and
493                            rebuild dbmap
494                          */
495                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
496                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
497                         if (ret != 0) {
498                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
499                                           nodemap->nodes[j].pnn));
500                                 return -1;
501                         }
502                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
503                                            remote_dbmap->dbs[db].persistent);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
506                                 return -1;
507                         }
508                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
511                                 return -1;
512                         }
513                 }
514         }
515
516         return 0;
517 }
518
519
520 /*
521   pull the remote database contents from one node into the recdb
522  */
523 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
524                                     struct tdb_wrap *recdb, uint32_t dbid,
525                                     bool persistent)
526 {
527         int ret;
528         TDB_DATA outdata;
529         struct ctdb_marshall_buffer *reply;
530         struct ctdb_rec_data *rec;
531         int i;
532         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
533
534         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
535                                CONTROL_TIMEOUT(), &outdata);
536         if (ret != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
538                 talloc_free(tmp_ctx);
539                 return -1;
540         }
541
542         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
543
544         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
545                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
546                 talloc_free(tmp_ctx);
547                 return -1;
548         }
549         
550         rec = (struct ctdb_rec_data *)&reply->data[0];
551         
552         for (i=0;
553              i<reply->count;
554              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
555                 TDB_DATA key, data;
556                 struct ctdb_ltdb_header *hdr;
557                 TDB_DATA existing;
558                 
559                 key.dptr = &rec->data[0];
560                 key.dsize = rec->keylen;
561                 data.dptr = &rec->data[key.dsize];
562                 data.dsize = rec->datalen;
563                 
564                 hdr = (struct ctdb_ltdb_header *)data.dptr;
565
566                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
567                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
568                         talloc_free(tmp_ctx);
569                         return -1;
570                 }
571
572                 /* fetch the existing record, if any */
573                 existing = tdb_fetch(recdb->tdb, key);
574                 
575                 if (existing.dptr != NULL) {
576                         struct ctdb_ltdb_header header;
577                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
578                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
579                                          (unsigned)existing.dsize, srcnode));
580                                 free(existing.dptr);
581                                 talloc_free(tmp_ctx);
582                                 return -1;
583                         }
584                         header = *(struct ctdb_ltdb_header *)existing.dptr;
585                         free(existing.dptr);
586                         if (!(header.rsn < hdr->rsn ||
587                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
588                                 continue;
589                         }
590                 }
591                 
592                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
593                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
594                         talloc_free(tmp_ctx);
595                         return -1;                              
596                 }
597         }
598
599         talloc_free(tmp_ctx);
600
601         return 0;
602 }
603
604 /*
605   pull all the remote database contents into the recdb
606  */
607 static int pull_remote_database(struct ctdb_context *ctdb,
608                                 struct ctdb_recoverd *rec, 
609                                 struct ctdb_node_map *nodemap, 
610                                 struct tdb_wrap *recdb, uint32_t dbid,
611                                 bool persistent)
612 {
613         int j;
614
615         /* pull all records from all other nodes across onto this node
616            (this merges based on rsn)
617         */
618         for (j=0; j<nodemap->num; j++) {
619                 /* dont merge from nodes that are unavailable */
620                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
621                         continue;
622                 }
623                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
624                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
625                                  nodemap->nodes[j].pnn));
626                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
627                         return -1;
628                 }
629         }
630         
631         return 0;
632 }
633
634
635 /*
636   update flags on all active nodes
637  */
638 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
639 {
640         int ret;
641
642         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
643                 if (ret != 0) {
644                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
645                 return -1;
646         }
647
648         return 0;
649 }
650
651 /*
652   ensure all nodes have the same vnnmap we do
653  */
654 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
655                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
656 {
657         int j, ret;
658
659         /* push the new vnn map out to all the nodes */
660         for (j=0; j<nodemap->num; j++) {
661                 /* dont push to nodes that are unavailable */
662                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
663                         continue;
664                 }
665
666                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
667                 if (ret != 0) {
668                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
669                         return -1;
670                 }
671         }
672
673         return 0;
674 }
675
676
677 struct vacuum_info {
678         struct vacuum_info *next, *prev;
679         struct ctdb_recoverd *rec;
680         uint32_t srcnode;
681         struct ctdb_db_context *ctdb_db;
682         struct ctdb_marshall_buffer *recs;
683         struct ctdb_rec_data *r;
684 };
685
686 static void vacuum_fetch_next(struct vacuum_info *v);
687
688 /*
689   called when a vacuum fetch has completed - just free it and do the next one
690  */
691 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
692 {
693         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
694         talloc_free(state);
695         vacuum_fetch_next(v);
696 }
697
698
699 /*
700   process the next element from the vacuum list
701 */
702 static void vacuum_fetch_next(struct vacuum_info *v)
703 {
704         struct ctdb_call call;
705         struct ctdb_rec_data *r;
706
707         while (v->recs->count) {
708                 struct ctdb_client_call_state *state;
709                 TDB_DATA data;
710                 struct ctdb_ltdb_header *hdr;
711
712                 ZERO_STRUCT(call);
713                 call.call_id = CTDB_NULL_FUNC;
714                 call.flags = CTDB_IMMEDIATE_MIGRATION;
715
716                 r = v->r;
717                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
718                 v->recs->count--;
719
720                 call.key.dptr = &r->data[0];
721                 call.key.dsize = r->keylen;
722
723                 /* ensure we don't block this daemon - just skip a record if we can't get
724                    the chainlock */
725                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
726                         continue;
727                 }
728
729                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
730                 if (data.dptr == NULL) {
731                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
732                         continue;
733                 }
734
735                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
736                         free(data.dptr);
737                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
738                         continue;
739                 }
740                 
741                 hdr = (struct ctdb_ltdb_header *)data.dptr;
742                 if (hdr->dmaster == v->rec->ctdb->pnn) {
743                         /* its already local */
744                         free(data.dptr);
745                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
746                         continue;
747                 }
748
749                 free(data.dptr);
750
751                 state = ctdb_call_send(v->ctdb_db, &call);
752                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
753                 if (state == NULL) {
754                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
755                         talloc_free(v);
756                         return;
757                 }
758                 state->async.fn = vacuum_fetch_callback;
759                 state->async.private_data = v;
760                 return;
761         }
762
763         talloc_free(v);
764 }
765
766
767 /*
768   destroy a vacuum info structure
769  */
770 static int vacuum_info_destructor(struct vacuum_info *v)
771 {
772         DLIST_REMOVE(v->rec->vacuum_info, v);
773         return 0;
774 }
775
776
777 /*
778   handler for vacuum fetch
779 */
780 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
781                                  TDB_DATA data, void *private_data)
782 {
783         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
784         struct ctdb_marshall_buffer *recs;
785         int ret, i;
786         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
787         const char *name;
788         struct ctdb_dbid_map *dbmap=NULL;
789         bool persistent = false;
790         struct ctdb_db_context *ctdb_db;
791         struct ctdb_rec_data *r;
792         uint32_t srcnode;
793         struct vacuum_info *v;
794
795         recs = (struct ctdb_marshall_buffer *)data.dptr;
796         r = (struct ctdb_rec_data *)&recs->data[0];
797
798         if (recs->count == 0) {
799                 talloc_free(tmp_ctx);
800                 return;
801         }
802
803         srcnode = r->reqid;
804
805         for (v=rec->vacuum_info;v;v=v->next) {
806                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
807                         /* we're already working on records from this node */
808                         talloc_free(tmp_ctx);
809                         return;
810                 }
811         }
812
813         /* work out if the database is persistent */
814         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
815         if (ret != 0) {
816                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
817                 talloc_free(tmp_ctx);
818                 return;
819         }
820
821         for (i=0;i<dbmap->num;i++) {
822                 if (dbmap->dbs[i].dbid == recs->db_id) {
823                         persistent = dbmap->dbs[i].persistent;
824                         break;
825                 }
826         }
827         if (i == dbmap->num) {
828                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
829                 talloc_free(tmp_ctx);
830                 return;         
831         }
832
833         /* find the name of this database */
834         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
835                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
836                 talloc_free(tmp_ctx);
837                 return;
838         }
839
840         /* attach to it */
841         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
842         if (ctdb_db == NULL) {
843                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
844                 talloc_free(tmp_ctx);
845                 return;
846         }
847
848         v = talloc_zero(rec, struct vacuum_info);
849         if (v == NULL) {
850                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
851                 talloc_free(tmp_ctx);
852                 return;
853         }
854
855         v->rec = rec;
856         v->srcnode = srcnode;
857         v->ctdb_db = ctdb_db;
858         v->recs = talloc_memdup(v, recs, data.dsize);
859         if (v->recs == NULL) {
860                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
861                 talloc_free(v);
862                 talloc_free(tmp_ctx);
863                 return;         
864         }
865         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
866
867         DLIST_ADD(rec->vacuum_info, v);
868
869         talloc_set_destructor(v, vacuum_info_destructor);
870
871         vacuum_fetch_next(v);
872         talloc_free(tmp_ctx);
873 }
874
875
876 /*
877   called when ctdb_wait_timeout should finish
878  */
879 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
880                               struct timeval yt, void *p)
881 {
882         uint32_t *timed_out = (uint32_t *)p;
883         (*timed_out) = 1;
884 }
885
886 /*
887   wait for a given number of seconds
888  */
889 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
890 {
891         uint32_t timed_out = 0;
892         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
893         while (!timed_out) {
894                 event_loop_once(ctdb->ev);
895         }
896 }
897
898 /*
899   called when an election times out (ends)
900  */
901 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
902                                   struct timeval t, void *p)
903 {
904         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
905         rec->election_timeout = NULL;
906
907         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
908 }
909
910
911 /*
912   wait for an election to finish. It finished election_timeout seconds after
913   the last election packet is received
914  */
915 static void ctdb_wait_election(struct ctdb_recoverd *rec)
916 {
917         struct ctdb_context *ctdb = rec->ctdb;
918         while (rec->election_timeout) {
919                 event_loop_once(ctdb->ev);
920         }
921 }
922
923 /*
924   Update our local flags from all remote connected nodes. 
925   This is only run when we are or we belive we are the recovery master
926  */
927 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
928 {
929         int j;
930         struct ctdb_context *ctdb = rec->ctdb;
931         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
932
933         /* get the nodemap for all active remote nodes and verify
934            they are the same as for this node
935          */
936         for (j=0; j<nodemap->num; j++) {
937                 struct ctdb_node_map *remote_nodemap=NULL;
938                 int ret;
939
940                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
941                         continue;
942                 }
943                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
944                         continue;
945                 }
946
947                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
948                                            mem_ctx, &remote_nodemap);
949                 if (ret != 0) {
950                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
951                                   nodemap->nodes[j].pnn));
952                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
953                         talloc_free(mem_ctx);
954                         return MONITOR_FAILED;
955                 }
956                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
957                         /* We should tell our daemon about this so it
958                            updates its flags or else we will log the same 
959                            message again in the next iteration of recovery.
960                            Since we are the recovery master we can just as
961                            well update the flags on all nodes.
962                         */
963                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
964                         if (ret != 0) {
965                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
966                                 return -1;
967                         }
968
969                         /* Update our local copy of the flags in the recovery
970                            daemon.
971                         */
972                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
973                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
974                                  nodemap->nodes[j].flags));
975                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
976                 }
977                 talloc_free(remote_nodemap);
978         }
979         talloc_free(mem_ctx);
980         return MONITOR_OK;
981 }
982
983
984 /* Create a new random generation ip. 
985    The generation id can not be the INVALID_GENERATION id
986 */
987 static uint32_t new_generation(void)
988 {
989         uint32_t generation;
990
991         while (1) {
992                 generation = random();
993
994                 if (generation != INVALID_GENERATION) {
995                         break;
996                 }
997         }
998
999         return generation;
1000 }
1001
1002
1003 /*
1004   create a temporary working database
1005  */
1006 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1007 {
1008         char *name;
1009         struct tdb_wrap *recdb;
1010         unsigned tdb_flags;
1011
1012         /* open up the temporary recovery database */
1013         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1014                                ctdb->db_directory_state,
1015                                ctdb->pnn);
1016         if (name == NULL) {
1017                 return NULL;
1018         }
1019         unlink(name);
1020
1021         tdb_flags = TDB_NOLOCK;
1022         if (ctdb->valgrinding) {
1023                 tdb_flags |= TDB_NOMMAP;
1024         }
1025         tdb_flags |= TDB_DISALLOW_NESTING;
1026
1027         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1028                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1029         if (recdb == NULL) {
1030                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1031         }
1032
1033         talloc_free(name);
1034
1035         return recdb;
1036 }
1037
1038
1039 /* 
1040    a traverse function for pulling all relevent records from recdb
1041  */
1042 struct recdb_data {
1043         struct ctdb_context *ctdb;
1044         struct ctdb_marshall_buffer *recdata;
1045         uint32_t len;
1046         bool failed;
1047         bool persistent;
1048 };
1049
1050 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1051 {
1052         struct recdb_data *params = (struct recdb_data *)p;
1053         struct ctdb_rec_data *rec;
1054         struct ctdb_ltdb_header *hdr;
1055
1056         /* skip empty records */
1057         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1058                 return 0;
1059         }
1060
1061         /* update the dmaster field to point to us */
1062         hdr = (struct ctdb_ltdb_header *)data.dptr;
1063         if (!params->persistent) {
1064                 hdr->dmaster = params->ctdb->pnn;
1065         }
1066
1067         /* add the record to the blob ready to send to the nodes */
1068         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1069         if (rec == NULL) {
1070                 params->failed = true;
1071                 return -1;
1072         }
1073         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1074         if (params->recdata == NULL) {
1075                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1076                          rec->length + params->len, params->recdata->count));
1077                 params->failed = true;
1078                 return -1;
1079         }
1080         params->recdata->count++;
1081         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1082         params->len += rec->length;
1083         talloc_free(rec);
1084
1085         return 0;
1086 }
1087
1088 /*
1089   push the recdb database out to all nodes
1090  */
1091 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1092                                bool persistent,
1093                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1094 {
1095         struct recdb_data params;
1096         struct ctdb_marshall_buffer *recdata;
1097         TDB_DATA outdata;
1098         TALLOC_CTX *tmp_ctx;
1099         uint32_t *nodes;
1100
1101         tmp_ctx = talloc_new(ctdb);
1102         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1103
1104         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1105         CTDB_NO_MEMORY(ctdb, recdata);
1106
1107         recdata->db_id = dbid;
1108
1109         params.ctdb = ctdb;
1110         params.recdata = recdata;
1111         params.len = offsetof(struct ctdb_marshall_buffer, data);
1112         params.failed = false;
1113         params.persistent = persistent;
1114
1115         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1116                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1117                 talloc_free(params.recdata);
1118                 talloc_free(tmp_ctx);
1119                 return -1;
1120         }
1121
1122         if (params.failed) {
1123                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1124                 talloc_free(params.recdata);
1125                 talloc_free(tmp_ctx);
1126                 return -1;              
1127         }
1128
1129         recdata = params.recdata;
1130
1131         outdata.dptr = (void *)recdata;
1132         outdata.dsize = params.len;
1133
1134         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1135         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1136                                         nodes, 0,
1137                                         CONTROL_TIMEOUT(), false, outdata,
1138                                         NULL, NULL,
1139                                         NULL) != 0) {
1140                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1141                 talloc_free(recdata);
1142                 talloc_free(tmp_ctx);
1143                 return -1;
1144         }
1145
1146         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1147                   dbid, recdata->count));
1148
1149         talloc_free(recdata);
1150         talloc_free(tmp_ctx);
1151
1152         return 0;
1153 }
1154
1155
1156 /*
1157   go through a full recovery on one database 
1158  */
1159 static int recover_database(struct ctdb_recoverd *rec, 
1160                             TALLOC_CTX *mem_ctx,
1161                             uint32_t dbid,
1162                             bool persistent,
1163                             uint32_t pnn, 
1164                             struct ctdb_node_map *nodemap,
1165                             uint32_t transaction_id)
1166 {
1167         struct tdb_wrap *recdb;
1168         int ret;
1169         struct ctdb_context *ctdb = rec->ctdb;
1170         TDB_DATA data;
1171         struct ctdb_control_wipe_database w;
1172         uint32_t *nodes;
1173
1174         recdb = create_recdb(ctdb, mem_ctx);
1175         if (recdb == NULL) {
1176                 return -1;
1177         }
1178
1179         /* pull all remote databases onto the recdb */
1180         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1181         if (ret != 0) {
1182                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1183                 return -1;
1184         }
1185
1186         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1187
1188         /* wipe all the remote databases. This is safe as we are in a transaction */
1189         w.db_id = dbid;
1190         w.transaction_id = transaction_id;
1191
1192         data.dptr = (void *)&w;
1193         data.dsize = sizeof(w);
1194
1195         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1197                                         nodes, 0,
1198                                         CONTROL_TIMEOUT(), false, data,
1199                                         NULL, NULL,
1200                                         NULL) != 0) {
1201                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1202                 talloc_free(recdb);
1203                 return -1;
1204         }
1205         
1206         /* push out the correct database. This sets the dmaster and skips 
1207            the empty records */
1208         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1209         if (ret != 0) {
1210                 talloc_free(recdb);
1211                 return -1;
1212         }
1213
1214         /* all done with this database */
1215         talloc_free(recdb);
1216
1217         return 0;
1218 }
1219
1220 /*
1221   reload the nodes file 
1222 */
1223 static void reload_nodes_file(struct ctdb_context *ctdb)
1224 {
1225         ctdb->nodes = NULL;
1226         ctdb_load_nodes_file(ctdb);
1227 }
1228
1229         
1230 /*
1231   we are the recmaster, and recovery is needed - start a recovery run
1232  */
1233 static int do_recovery(struct ctdb_recoverd *rec, 
1234                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1235                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1236 {
1237         struct ctdb_context *ctdb = rec->ctdb;
1238         int i, j, ret;
1239         uint32_t generation;
1240         struct ctdb_dbid_map *dbmap;
1241         TDB_DATA data;
1242         uint32_t *nodes;
1243         struct timeval start_time;
1244
1245         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1246
1247         /* if recovery fails, force it again */
1248         rec->need_recovery = true;
1249
1250         for (i=0; i<ctdb->num_nodes; i++) {
1251                 struct ctdb_banning_state *ban_state;
1252
1253                 if (ctdb->nodes[i]->ban_state == NULL) {
1254                         continue;
1255                 }
1256                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1257                 if (ban_state->count < 2*ctdb->num_nodes) {
1258                         continue;
1259                 }
1260                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1261                         ctdb->nodes[i]->pnn, ban_state->count,
1262                         ctdb->tunable.recovery_ban_period));
1263                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1264                 ban_state->count = 0;
1265         }
1266
1267
1268         if (ctdb->tunable.verify_recovery_lock != 0) {
1269                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1270                 start_time = timeval_current();
1271                 if (!ctdb_recovery_lock(ctdb, true)) {
1272                         ctdb_set_culprit(rec, pnn);
1273                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1274                         return -1;
1275                 }
1276                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1277                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1278         }
1279
1280         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1281
1282         /* get a list of all databases */
1283         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1284         if (ret != 0) {
1285                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1286                 return -1;
1287         }
1288
1289         /* we do the db creation before we set the recovery mode, so the freeze happens
1290            on all databases we will be dealing with. */
1291
1292         /* verify that we have all the databases any other node has */
1293         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1294         if (ret != 0) {
1295                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1296                 return -1;
1297         }
1298
1299         /* verify that all other nodes have all our databases */
1300         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1301         if (ret != 0) {
1302                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1303                 return -1;
1304         }
1305         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1306
1307         /* update the database priority for all remote databases */
1308         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1309         if (ret != 0) {
1310                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1311         }
1312         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1313
1314
1315         /* set recovery mode to active on all nodes */
1316         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1317         if (ret != 0) {
1318                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1319                 return -1;
1320         }
1321
1322         /* execute the "startrecovery" event script on all nodes */
1323         ret = run_startrecovery_eventscript(rec, nodemap);
1324         if (ret!=0) {
1325                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1326                 return -1;
1327         }
1328
1329         /*
1330           update all nodes to have the same flags that we have
1331          */
1332         for (i=0;i<nodemap->num;i++) {
1333                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1334                         continue;
1335                 }
1336
1337                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1338                 if (ret != 0) {
1339                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1340                         return -1;
1341                 }
1342         }
1343
1344         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1345
1346         /* pick a new generation number */
1347         generation = new_generation();
1348
1349         /* change the vnnmap on this node to use the new generation 
1350            number but not on any other nodes.
1351            this guarantees that if we abort the recovery prematurely
1352            for some reason (a node stops responding?)
1353            that we can just return immediately and we will reenter
1354            recovery shortly again.
1355            I.e. we deliberately leave the cluster with an inconsistent
1356            generation id to allow us to abort recovery at any stage and
1357            just restart it from scratch.
1358          */
1359         vnnmap->generation = generation;
1360         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1361         if (ret != 0) {
1362                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1363                 return -1;
1364         }
1365
1366         data.dptr = (void *)&generation;
1367         data.dsize = sizeof(uint32_t);
1368
1369         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1370         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1371                                         nodes, 0,
1372                                         CONTROL_TIMEOUT(), false, data,
1373                                         NULL,
1374                                         transaction_start_fail_callback,
1375                                         rec) != 0) {
1376                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1377                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1378                                         nodes, 0,
1379                                         CONTROL_TIMEOUT(), false, tdb_null,
1380                                         NULL,
1381                                         NULL,
1382                                         NULL) != 0) {
1383                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1384                 }
1385                 return -1;
1386         }
1387
1388         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1389
1390         for (i=0;i<dbmap->num;i++) {
1391                 ret = recover_database(rec, mem_ctx,
1392                                        dbmap->dbs[i].dbid,
1393                                        dbmap->dbs[i].persistent,
1394                                        pnn, nodemap, generation);
1395                 if (ret != 0) {
1396                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1397                         return -1;
1398                 }
1399         }
1400
1401         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1402
1403         /* commit all the changes */
1404         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1405                                         nodes, 0,
1406                                         CONTROL_TIMEOUT(), false, data,
1407                                         NULL, NULL,
1408                                         NULL) != 0) {
1409                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1410                 return -1;
1411         }
1412
1413         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1414         
1415
1416         /* update the capabilities for all nodes */
1417         ret = update_capabilities(ctdb, nodemap);
1418         if (ret!=0) {
1419                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1420                 return -1;
1421         }
1422
1423         /* build a new vnn map with all the currently active and
1424            unbanned nodes */
1425         generation = new_generation();
1426         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1427         CTDB_NO_MEMORY(ctdb, vnnmap);
1428         vnnmap->generation = generation;
1429         vnnmap->size = 0;
1430         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1431         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1432         for (i=j=0;i<nodemap->num;i++) {
1433                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1434                         continue;
1435                 }
1436                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1437                         /* this node can not be an lmaster */
1438                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1439                         continue;
1440                 }
1441
1442                 vnnmap->size++;
1443                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1444                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1445                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1446
1447         }
1448         if (vnnmap->size == 0) {
1449                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1450                 vnnmap->size++;
1451                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1452                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1453                 vnnmap->map[0] = pnn;
1454         }       
1455
1456         /* update to the new vnnmap on all nodes */
1457         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1458         if (ret != 0) {
1459                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1460                 return -1;
1461         }
1462
1463         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1464
1465         /* update recmaster to point to us for all nodes */
1466         ret = set_recovery_master(ctdb, nodemap, pnn);
1467         if (ret!=0) {
1468                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1469                 return -1;
1470         }
1471
1472         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1473
1474         /*
1475           update all nodes to have the same flags that we have
1476          */
1477         for (i=0;i<nodemap->num;i++) {
1478                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1479                         continue;
1480                 }
1481
1482                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1483                 if (ret != 0) {
1484                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1485                         return -1;
1486                 }
1487         }
1488
1489         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1490
1491         /* disable recovery mode */
1492         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1493         if (ret != 0) {
1494                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1495                 return -1;
1496         }
1497
1498         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1499
1500         /*
1501           tell nodes to takeover their public IPs
1502          */
1503         rec->need_takeover_run = false;
1504         ret = ctdb_takeover_run(ctdb, nodemap);
1505         if (ret != 0) {
1506                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1507                 return -1;
1508         }
1509         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1510
1511         /* execute the "recovered" event script on all nodes */
1512         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1513         if (ret!=0) {
1514                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1515                 return -1;
1516         }
1517
1518         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1519
1520         /* send a message to all clients telling them that the cluster 
1521            has been reconfigured */
1522         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1523
1524         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1525
1526         rec->need_recovery = false;
1527
1528         /* we managed to complete a full recovery, make sure to forgive
1529            any past sins by the nodes that could now participate in the
1530            recovery.
1531         */
1532         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1533         for (i=0;i<nodemap->num;i++) {
1534                 struct ctdb_banning_state *ban_state;
1535
1536                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1537                         continue;
1538                 }
1539
1540                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1541                 if (ban_state == NULL) {
1542                         continue;
1543                 }
1544
1545                 ban_state->count = 0;
1546         }
1547
1548
1549         /* We just finished a recovery successfully. 
1550            We now wait for rerecovery_timeout before we allow 
1551            another recovery to take place.
1552         */
1553         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1554         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1555         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1556
1557         return 0;
1558 }
1559
1560
1561 /*
1562   elections are won by first checking the number of connected nodes, then
1563   the priority time, then the pnn
1564  */
1565 struct election_message {
1566         uint32_t num_connected;
1567         struct timeval priority_time;
1568         uint32_t pnn;
1569         uint32_t node_flags;
1570 };
1571
1572 /*
1573   form this nodes election data
1574  */
1575 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1576 {
1577         int ret, i;
1578         struct ctdb_node_map *nodemap;
1579         struct ctdb_context *ctdb = rec->ctdb;
1580
1581         ZERO_STRUCTP(em);
1582
1583         em->pnn = rec->ctdb->pnn;
1584         em->priority_time = rec->priority_time;
1585
1586         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1587         if (ret != 0) {
1588                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1589                 return;
1590         }
1591
1592         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1593         em->node_flags = rec->node_flags;
1594
1595         for (i=0;i<nodemap->num;i++) {
1596                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1597                         em->num_connected++;
1598                 }
1599         }
1600
1601         /* we shouldnt try to win this election if we cant be a recmaster */
1602         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1603                 em->num_connected = 0;
1604                 em->priority_time = timeval_current();
1605         }
1606
1607         talloc_free(nodemap);
1608 }
1609
1610 /*
1611   see if the given election data wins
1612  */
1613 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1614 {
1615         struct election_message myem;
1616         int cmp = 0;
1617
1618         ctdb_election_data(rec, &myem);
1619
1620         /* we cant win if we dont have the recmaster capability */
1621         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1622                 return false;
1623         }
1624
1625         /* we cant win if we are banned */
1626         if (rec->node_flags & NODE_FLAGS_BANNED) {
1627                 return false;
1628         }       
1629
1630         /* we cant win if we are stopped */
1631         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1632                 return false;
1633         }       
1634
1635         /* we will automatically win if the other node is banned */
1636         if (em->node_flags & NODE_FLAGS_BANNED) {
1637                 return true;
1638         }
1639
1640         /* we will automatically win if the other node is banned */
1641         if (em->node_flags & NODE_FLAGS_STOPPED) {
1642                 return true;
1643         }
1644
1645         /* try to use the most connected node */
1646         if (cmp == 0) {
1647                 cmp = (int)myem.num_connected - (int)em->num_connected;
1648         }
1649
1650         /* then the longest running node */
1651         if (cmp == 0) {
1652                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1653         }
1654
1655         if (cmp == 0) {
1656                 cmp = (int)myem.pnn - (int)em->pnn;
1657         }
1658
1659         return cmp > 0;
1660 }
1661
1662 /*
1663   send out an election request
1664  */
1665 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1666 {
1667         int ret;
1668         TDB_DATA election_data;
1669         struct election_message emsg;
1670         uint64_t srvid;
1671         struct ctdb_context *ctdb = rec->ctdb;
1672
1673         srvid = CTDB_SRVID_RECOVERY;
1674
1675         ctdb_election_data(rec, &emsg);
1676
1677         election_data.dsize = sizeof(struct election_message);
1678         election_data.dptr  = (unsigned char *)&emsg;
1679
1680
1681         /* send an election message to all active nodes */
1682         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1683         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1684
1685
1686         /* A new node that is already frozen has entered the cluster.
1687            The existing nodes are not frozen and dont need to be frozen
1688            until the election has ended and we start the actual recovery
1689         */
1690         if (update_recmaster == true) {
1691                 /* first we assume we will win the election and set 
1692                    recoverymaster to be ourself on the current node
1693                  */
1694                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1695                 if (ret != 0) {
1696                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1697                         return -1;
1698                 }
1699         }
1700
1701
1702         return 0;
1703 }
1704
1705 /*
1706   this function will unban all nodes in the cluster
1707 */
1708 static void unban_all_nodes(struct ctdb_context *ctdb)
1709 {
1710         int ret, i;
1711         struct ctdb_node_map *nodemap;
1712         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1713         
1714         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1715         if (ret != 0) {
1716                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1717                 return;
1718         }
1719
1720         for (i=0;i<nodemap->num;i++) {
1721                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1722                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1723                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1724                 }
1725         }
1726
1727         talloc_free(tmp_ctx);
1728 }
1729
1730
1731 /*
1732   we think we are winning the election - send a broadcast election request
1733  */
1734 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1735 {
1736         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1737         int ret;
1738
1739         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1740         if (ret != 0) {
1741                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1742         }
1743
1744         talloc_free(rec->send_election_te);
1745         rec->send_election_te = NULL;
1746 }
1747
1748 /*
1749   handler for memory dumps
1750 */
1751 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1752                              TDB_DATA data, void *private_data)
1753 {
1754         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1755         TDB_DATA *dump;
1756         int ret;
1757         struct rd_memdump_reply *rd;
1758
1759         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1760                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1761                 talloc_free(tmp_ctx);
1762                 return;
1763         }
1764         rd = (struct rd_memdump_reply *)data.dptr;
1765
1766         dump = talloc_zero(tmp_ctx, TDB_DATA);
1767         if (dump == NULL) {
1768                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1769                 talloc_free(tmp_ctx);
1770                 return;
1771         }
1772         ret = ctdb_dump_memory(ctdb, dump);
1773         if (ret != 0) {
1774                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1775                 talloc_free(tmp_ctx);
1776                 return;
1777         }
1778
1779 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1780
1781         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1782         if (ret != 0) {
1783                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1784                 talloc_free(tmp_ctx);
1785                 return;
1786         }
1787
1788         talloc_free(tmp_ctx);
1789 }
1790
1791 /*
1792   handler for reload_nodes
1793 */
1794 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1795                              TDB_DATA data, void *private_data)
1796 {
1797         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1798
1799         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1800
1801         reload_nodes_file(rec->ctdb);
1802 }
1803
1804
1805 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1806                               struct timeval yt, void *p)
1807 {
1808         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1809
1810         talloc_free(rec->ip_check_disable_ctx);
1811         rec->ip_check_disable_ctx = NULL;
1812 }
1813
1814 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1815                              TDB_DATA data, void *private_data)
1816 {
1817         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1818         uint32_t timeout;
1819
1820         if (rec->ip_check_disable_ctx != NULL) {
1821                 talloc_free(rec->ip_check_disable_ctx);
1822                 rec->ip_check_disable_ctx = NULL;
1823         }
1824
1825         if (data.dsize != sizeof(uint32_t)) {
1826                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1827                                  "expexting %lu\n", (long unsigned)data.dsize,
1828                                  (long unsigned)sizeof(uint32_t)));
1829                 return;
1830         }
1831         if (data.dptr == NULL) {
1832                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1833                 return;
1834         }
1835
1836         timeout = *((uint32_t *)data.dptr);
1837         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1838
1839         rec->ip_check_disable_ctx = talloc_new(rec);
1840         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1841
1842         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1843 }
1844
1845
1846 /*
1847   handler for ip reallocate, just add it to the list of callers and 
1848   handle this later in the monitor_cluster loop so we do not recurse
1849   with other callers to takeover_run()
1850 */
1851 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1852                              TDB_DATA data, void *private_data)
1853 {
1854         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1855         struct ip_reallocate_list *caller;
1856
1857         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1858                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1859                 return;
1860         }
1861
1862         if (rec->ip_reallocate_ctx == NULL) {
1863                 rec->ip_reallocate_ctx = talloc_new(rec);
1864                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1865         }
1866
1867         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1868         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1869
1870         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1871         caller->next = rec->reallocate_callers;
1872         rec->reallocate_callers = caller;
1873
1874         return;
1875 }
1876
1877 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1878 {
1879         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1880         TDB_DATA result;
1881         int32_t ret;
1882         struct ip_reallocate_list *callers;
1883
1884         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1885         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1886         result.dsize = sizeof(int32_t);
1887         result.dptr  = (uint8_t *)&ret;
1888
1889         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1890
1891                 /* Someone that sent srvid==0 does not want a reply */
1892                 if (callers->rd->srvid == 0) {
1893                         continue;
1894                 }
1895                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1896                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1897                                   (unsigned long long)callers->rd->srvid));
1898                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1899                 if (ret != 0) {
1900                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1901                                          "message to %u:%llu\n",
1902                                          (unsigned)callers->rd->pnn,
1903                                          (unsigned long long)callers->rd->srvid));
1904                 }
1905         }
1906
1907         talloc_free(tmp_ctx);
1908         talloc_free(rec->ip_reallocate_ctx);
1909         rec->ip_reallocate_ctx = NULL;
1910         rec->reallocate_callers = NULL;
1911         
1912 }
1913
1914
1915 /*
1916   handler for recovery master elections
1917 */
1918 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1919                              TDB_DATA data, void *private_data)
1920 {
1921         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1922         int ret;
1923         struct election_message *em = (struct election_message *)data.dptr;
1924         TALLOC_CTX *mem_ctx;
1925
1926         /* we got an election packet - update the timeout for the election */
1927         talloc_free(rec->election_timeout);
1928         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1929                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1930                                                 ctdb_election_timeout, rec);
1931
1932         mem_ctx = talloc_new(ctdb);
1933
1934         /* someone called an election. check their election data
1935            and if we disagree and we would rather be the elected node, 
1936            send a new election message to all other nodes
1937          */
1938         if (ctdb_election_win(rec, em)) {
1939                 if (!rec->send_election_te) {
1940                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1941                                                                 timeval_current_ofs(0, 500000),
1942                                                                 election_send_request, rec);
1943                 }
1944                 talloc_free(mem_ctx);
1945                 /*unban_all_nodes(ctdb);*/
1946                 return;
1947         }
1948         
1949         /* we didn't win */
1950         talloc_free(rec->send_election_te);
1951         rec->send_election_te = NULL;
1952
1953         if (ctdb->tunable.verify_recovery_lock != 0) {
1954                 /* release the recmaster lock */
1955                 if (em->pnn != ctdb->pnn &&
1956                     ctdb->recovery_lock_fd != -1) {
1957                         close(ctdb->recovery_lock_fd);
1958                         ctdb->recovery_lock_fd = -1;
1959                         unban_all_nodes(ctdb);
1960                 }
1961         }
1962
1963         /* ok, let that guy become recmaster then */
1964         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1965         if (ret != 0) {
1966                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1967                 talloc_free(mem_ctx);
1968                 return;
1969         }
1970
1971         talloc_free(mem_ctx);
1972         return;
1973 }
1974
1975
1976 /*
1977   force the start of the election process
1978  */
1979 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1980                            struct ctdb_node_map *nodemap)
1981 {
1982         int ret;
1983         struct ctdb_context *ctdb = rec->ctdb;
1984
1985         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1986
1987         /* set all nodes to recovery mode to stop all internode traffic */
1988         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1989         if (ret != 0) {
1990                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1991                 return;
1992         }
1993
1994         talloc_free(rec->election_timeout);
1995         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1996                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1997                                                 ctdb_election_timeout, rec);
1998
1999         ret = send_election_request(rec, pnn, true);
2000         if (ret!=0) {
2001                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2002                 return;
2003         }
2004
2005         /* wait for a few seconds to collect all responses */
2006         ctdb_wait_election(rec);
2007 }
2008
2009
2010
2011 /*
2012   handler for when a node changes its flags
2013 */
2014 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2015                             TDB_DATA data, void *private_data)
2016 {
2017         int ret;
2018         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2019         struct ctdb_node_map *nodemap=NULL;
2020         TALLOC_CTX *tmp_ctx;
2021         uint32_t changed_flags;
2022         int i;
2023         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2024         int disabled_flag_changed;
2025
2026         if (data.dsize != sizeof(*c)) {
2027                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2028                 return;
2029         }
2030
2031         tmp_ctx = talloc_new(ctdb);
2032         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2033
2034         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2035         if (ret != 0) {
2036                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2037                 talloc_free(tmp_ctx);
2038                 return;         
2039         }
2040
2041
2042         for (i=0;i<nodemap->num;i++) {
2043                 if (nodemap->nodes[i].pnn == c->pnn) break;
2044         }
2045
2046         if (i == nodemap->num) {
2047                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2048                 talloc_free(tmp_ctx);
2049                 return;
2050         }
2051
2052         changed_flags = c->old_flags ^ c->new_flags;
2053
2054         if (nodemap->nodes[i].flags != c->new_flags) {
2055                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2056         }
2057
2058         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2059
2060         nodemap->nodes[i].flags = c->new_flags;
2061
2062         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2063                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2064
2065         if (ret == 0) {
2066                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2067                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2068         }
2069         
2070         if (ret == 0 &&
2071             ctdb->recovery_master == ctdb->pnn &&
2072             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2073                 /* Only do the takeover run if the perm disabled or unhealthy
2074                    flags changed since these will cause an ip failover but not
2075                    a recovery.
2076                    If the node became disconnected or banned this will also
2077                    lead to an ip address failover but that is handled 
2078                    during recovery
2079                 */
2080                 if (disabled_flag_changed) {
2081                         rec->need_takeover_run = true;
2082                 }
2083         }
2084
2085         talloc_free(tmp_ctx);
2086 }
2087
2088 /*
2089   handler for when we need to push out flag changes ot all other nodes
2090 */
2091 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2092                             TDB_DATA data, void *private_data)
2093 {
2094         int ret;
2095         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2096         struct ctdb_node_map *nodemap=NULL;
2097         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2098         uint32_t recmaster;
2099         uint32_t *nodes;
2100
2101         /* find the recovery master */
2102         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2103         if (ret != 0) {
2104                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2105                 talloc_free(tmp_ctx);
2106                 return;
2107         }
2108
2109         /* read the node flags from the recmaster */
2110         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2111         if (ret != 0) {
2112                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2113                 talloc_free(tmp_ctx);
2114                 return;
2115         }
2116         if (c->pnn >= nodemap->num) {
2117                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2118                 talloc_free(tmp_ctx);
2119                 return;
2120         }
2121
2122         /* send the flags update to all connected nodes */
2123         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2124
2125         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2126                                       nodes, 0, CONTROL_TIMEOUT(),
2127                                       false, data,
2128                                       NULL, NULL,
2129                                       NULL) != 0) {
2130                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2131
2132                 talloc_free(tmp_ctx);
2133                 return;
2134         }
2135
2136         talloc_free(tmp_ctx);
2137 }
2138
2139
2140 struct verify_recmode_normal_data {
2141         uint32_t count;
2142         enum monitor_result status;
2143 };
2144
2145 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2146 {
2147         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2148
2149
2150         /* one more node has responded with recmode data*/
2151         rmdata->count--;
2152
2153         /* if we failed to get the recmode, then return an error and let
2154            the main loop try again.
2155         */
2156         if (state->state != CTDB_CONTROL_DONE) {
2157                 if (rmdata->status == MONITOR_OK) {
2158                         rmdata->status = MONITOR_FAILED;
2159                 }
2160                 return;
2161         }
2162
2163         /* if we got a response, then the recmode will be stored in the
2164            status field
2165         */
2166         if (state->status != CTDB_RECOVERY_NORMAL) {
2167                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2168                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2169         }
2170
2171         return;
2172 }
2173
2174
2175 /* verify that all nodes are in normal recovery mode */
2176 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2177 {
2178         struct verify_recmode_normal_data *rmdata;
2179         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2180         struct ctdb_client_control_state *state;
2181         enum monitor_result status;
2182         int j;
2183         
2184         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2185         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2186         rmdata->count  = 0;
2187         rmdata->status = MONITOR_OK;
2188
2189         /* loop over all active nodes and send an async getrecmode call to 
2190            them*/
2191         for (j=0; j<nodemap->num; j++) {
2192                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2193                         continue;
2194                 }
2195                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2196                                         CONTROL_TIMEOUT(), 
2197                                         nodemap->nodes[j].pnn);
2198                 if (state == NULL) {
2199                         /* we failed to send the control, treat this as 
2200                            an error and try again next iteration
2201                         */                      
2202                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2203                         talloc_free(mem_ctx);
2204                         return MONITOR_FAILED;
2205                 }
2206
2207                 /* set up the callback functions */
2208                 state->async.fn = verify_recmode_normal_callback;
2209                 state->async.private_data = rmdata;
2210
2211                 /* one more control to wait for to complete */
2212                 rmdata->count++;
2213         }
2214
2215
2216         /* now wait for up to the maximum number of seconds allowed
2217            or until all nodes we expect a response from has replied
2218         */
2219         while (rmdata->count > 0) {
2220                 event_loop_once(ctdb->ev);
2221         }
2222
2223         status = rmdata->status;
2224         talloc_free(mem_ctx);
2225         return status;
2226 }
2227
2228
2229 struct verify_recmaster_data {
2230         struct ctdb_recoverd *rec;
2231         uint32_t count;
2232         uint32_t pnn;
2233         enum monitor_result status;
2234 };
2235
2236 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2237 {
2238         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2239
2240
2241         /* one more node has responded with recmaster data*/
2242         rmdata->count--;
2243
2244         /* if we failed to get the recmaster, then return an error and let
2245            the main loop try again.
2246         */
2247         if (state->state != CTDB_CONTROL_DONE) {
2248                 if (rmdata->status == MONITOR_OK) {
2249                         rmdata->status = MONITOR_FAILED;
2250                 }
2251                 return;
2252         }
2253
2254         /* if we got a response, then the recmaster will be stored in the
2255            status field
2256         */
2257         if (state->status != rmdata->pnn) {
2258                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2259                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2260                 rmdata->status = MONITOR_ELECTION_NEEDED;
2261         }
2262
2263         return;
2264 }
2265
2266
2267 /* verify that all nodes agree that we are the recmaster */
2268 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2269 {
2270         struct ctdb_context *ctdb = rec->ctdb;
2271         struct verify_recmaster_data *rmdata;
2272         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2273         struct ctdb_client_control_state *state;
2274         enum monitor_result status;
2275         int j;
2276         
2277         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2278         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2279         rmdata->rec    = rec;
2280         rmdata->count  = 0;
2281         rmdata->pnn    = pnn;
2282         rmdata->status = MONITOR_OK;
2283
2284         /* loop over all active nodes and send an async getrecmaster call to 
2285            them*/
2286         for (j=0; j<nodemap->num; j++) {
2287                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2288                         continue;
2289                 }
2290                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2291                                         CONTROL_TIMEOUT(),
2292                                         nodemap->nodes[j].pnn);
2293                 if (state == NULL) {
2294                         /* we failed to send the control, treat this as 
2295                            an error and try again next iteration
2296                         */                      
2297                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2298                         talloc_free(mem_ctx);
2299                         return MONITOR_FAILED;
2300                 }
2301
2302                 /* set up the callback functions */
2303                 state->async.fn = verify_recmaster_callback;
2304                 state->async.private_data = rmdata;
2305
2306                 /* one more control to wait for to complete */
2307                 rmdata->count++;
2308         }
2309
2310
2311         /* now wait for up to the maximum number of seconds allowed
2312            or until all nodes we expect a response from has replied
2313         */
2314         while (rmdata->count > 0) {
2315                 event_loop_once(ctdb->ev);
2316         }
2317
2318         status = rmdata->status;
2319         talloc_free(mem_ctx);
2320         return status;
2321 }
2322
2323
2324 /* called to check that the allocation of public ip addresses is ok.
2325 */
2326 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2327 {
2328         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2329         struct ctdb_all_public_ips *ips = NULL;
2330         struct ctdb_uptime *uptime1 = NULL;
2331         struct ctdb_uptime *uptime2 = NULL;
2332         int ret, j;
2333
2334         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2335                                 CTDB_CURRENT_NODE, &uptime1);
2336         if (ret != 0) {
2337                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2338                 talloc_free(mem_ctx);
2339                 return -1;
2340         }
2341
2342         /* read the ip allocation from the local node */
2343         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2344         if (ret != 0) {
2345                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2346                 talloc_free(mem_ctx);
2347                 return -1;
2348         }
2349
2350         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2351                                 CTDB_CURRENT_NODE, &uptime2);
2352         if (ret != 0) {
2353                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2354                 talloc_free(mem_ctx);
2355                 return -1;
2356         }
2357
2358         /* skip the check if the startrecovery time has changed */
2359         if (timeval_compare(&uptime1->last_recovery_started,
2360                             &uptime2->last_recovery_started) != 0) {
2361                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2362                 talloc_free(mem_ctx);
2363                 return 0;
2364         }
2365
2366         /* skip the check if the endrecovery time has changed */
2367         if (timeval_compare(&uptime1->last_recovery_finished,
2368                             &uptime2->last_recovery_finished) != 0) {
2369                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2370                 talloc_free(mem_ctx);
2371                 return 0;
2372         }
2373
2374         /* skip the check if we have started but not finished recovery */
2375         if (timeval_compare(&uptime1->last_recovery_finished,
2376                             &uptime1->last_recovery_started) != 1) {
2377                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2378                 talloc_free(mem_ctx);
2379
2380                 return 0;
2381         }
2382
2383         /* verify that we have the ip addresses we should have
2384            and we dont have ones we shouldnt have.
2385            if we find an inconsistency we set recmode to
2386            active on the local node and wait for the recmaster
2387            to do a full blown recovery
2388         */
2389         for (j=0; j<ips->num; j++) {
2390                 if (ips->ips[j].pnn == pnn) {
2391                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2392                                 struct takeover_run_reply rd;
2393                                 TDB_DATA data;
2394
2395                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2396                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2397
2398                                 rd.pnn   = ctdb->pnn;
2399                                 rd.srvid = 0;
2400                                 data.dptr = (uint8_t *)&rd;
2401                                 data.dsize = sizeof(rd);
2402
2403                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2404                                 if (ret != 0) {
2405                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2406                                 }
2407                         }
2408                 } else {
2409                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2410                                 struct takeover_run_reply rd;
2411                                 TDB_DATA data;
2412
2413                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2414                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2415
2416                                 rd.pnn   = ctdb->pnn;
2417                                 rd.srvid = 0;
2418                                 data.dptr = (uint8_t *)&rd;
2419                                 data.dsize = sizeof(rd);
2420
2421                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2422                                 if (ret != 0) {
2423                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2424                                 }
2425                         }
2426                 }
2427         }
2428
2429         talloc_free(mem_ctx);
2430         return 0;
2431 }
2432
2433
2434 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2435 {
2436         struct ctdb_node_map **remote_nodemaps = callback_data;
2437
2438         if (node_pnn >= ctdb->num_nodes) {
2439                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2440                 return;
2441         }
2442
2443         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2444
2445 }
2446
2447 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2448         struct ctdb_node_map *nodemap,
2449         struct ctdb_node_map **remote_nodemaps)
2450 {
2451         uint32_t *nodes;
2452
2453         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2454         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2455                                         nodes, 0,
2456                                         CONTROL_TIMEOUT(), false, tdb_null,
2457                                         async_getnodemap_callback,
2458                                         NULL,
2459                                         remote_nodemaps) != 0) {
2460                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2461
2462                 return -1;
2463         }
2464
2465         return 0;
2466 }
2467
2468 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2469 struct ctdb_check_reclock_state {
2470         struct ctdb_context *ctdb;
2471         struct timeval start_time;
2472         int fd[2];
2473         pid_t child;
2474         struct timed_event *te;
2475         struct fd_event *fde;
2476         enum reclock_child_status status;
2477 };
2478
2479 /* when we free the reclock state we must kill any child process.
2480 */
2481 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2482 {
2483         struct ctdb_context *ctdb = state->ctdb;
2484
2485         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2486
2487         if (state->fd[0] != -1) {
2488                 close(state->fd[0]);
2489                 state->fd[0] = -1;
2490         }
2491         if (state->fd[1] != -1) {
2492                 close(state->fd[1]);
2493                 state->fd[1] = -1;
2494         }
2495         kill(state->child, SIGKILL);
2496         return 0;
2497 }
2498
2499 /*
2500   called if our check_reclock child times out. this would happen if
2501   i/o to the reclock file blocks.
2502  */
2503 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2504                                          struct timeval t, void *private_data)
2505 {
2506         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2507                                            struct ctdb_check_reclock_state);
2508
2509         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2510         state->status = RECLOCK_TIMEOUT;
2511 }
2512
2513 /* this is called when the child process has completed checking the reclock
2514    file and has written data back to us through the pipe.
2515 */
2516 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2517                              uint16_t flags, void *private_data)
2518 {
2519         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2520                                              struct ctdb_check_reclock_state);
2521         char c = 0;
2522         int ret;
2523
2524         /* we got a response from our child process so we can abort the
2525            timeout.
2526         */
2527         talloc_free(state->te);
2528         state->te = NULL;
2529
2530         ret = read(state->fd[0], &c, 1);
2531         if (ret != 1 || c != RECLOCK_OK) {
2532                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2533                 state->status = RECLOCK_FAILED;
2534
2535                 return;
2536         }
2537
2538         state->status = RECLOCK_OK;
2539         return;
2540 }
2541
2542 static int check_recovery_lock(struct ctdb_context *ctdb)
2543 {
2544         int ret;
2545         struct ctdb_check_reclock_state *state;
2546         pid_t parent = getpid();
2547
2548         if (ctdb->recovery_lock_fd == -1) {
2549                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2550                 return -1;
2551         }
2552
2553         state = talloc(ctdb, struct ctdb_check_reclock_state);
2554         CTDB_NO_MEMORY(ctdb, state);
2555
2556         state->ctdb = ctdb;
2557         state->start_time = timeval_current();
2558         state->status = RECLOCK_CHECKING;
2559         state->fd[0] = -1;
2560         state->fd[1] = -1;
2561
2562         ret = pipe(state->fd);
2563         if (ret != 0) {
2564                 talloc_free(state);
2565                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2566                 return -1;
2567         }
2568
2569         state->child = fork();
2570         if (state->child == (pid_t)-1) {
2571                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2572                 close(state->fd[0]);
2573                 state->fd[0] = -1;
2574                 close(state->fd[1]);
2575                 state->fd[1] = -1;
2576                 talloc_free(state);
2577                 return -1;
2578         }
2579
2580         if (state->child == 0) {
2581                 char cc = RECLOCK_OK;
2582                 close(state->fd[0]);
2583                 state->fd[0] = -1;
2584
2585                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2586                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2587                         cc = RECLOCK_FAILED;
2588                 }
2589
2590                 write(state->fd[1], &cc, 1);
2591                 /* make sure we die when our parent dies */
2592                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2593                         sleep(5);
2594                         write(state->fd[1], &cc, 1);
2595                 }
2596                 _exit(0);
2597         }
2598         close(state->fd[1]);
2599         state->fd[1] = -1;
2600         set_close_on_exec(state->fd[0]);
2601
2602         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2603
2604         talloc_set_destructor(state, check_reclock_destructor);
2605
2606         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2607                                     ctdb_check_reclock_timeout, state);
2608         if (state->te == NULL) {
2609                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2610                 talloc_free(state);
2611                 return -1;
2612         }
2613
2614         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2615                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2616                                 reclock_child_handler,
2617                                 (void *)state);
2618
2619         if (state->fde == NULL) {
2620                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2621                 talloc_free(state);
2622                 return -1;
2623         }
2624
2625         while (state->status == RECLOCK_CHECKING) {
2626                 event_loop_once(ctdb->ev);
2627         }
2628
2629         if (state->status == RECLOCK_FAILED) {
2630                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2631                 close(ctdb->recovery_lock_fd);
2632                 ctdb->recovery_lock_fd = -1;
2633                 talloc_free(state);
2634                 return -1;
2635         }
2636
2637         talloc_free(state);
2638         return 0;
2639 }
2640
2641 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2642 {
2643         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2644         const char *reclockfile;
2645
2646         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2647                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2648                 talloc_free(tmp_ctx);
2649                 return -1;      
2650         }
2651
2652         if (reclockfile == NULL) {
2653                 if (ctdb->recovery_lock_file != NULL) {
2654                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2655                         talloc_free(ctdb->recovery_lock_file);
2656                         ctdb->recovery_lock_file = NULL;
2657                         if (ctdb->recovery_lock_fd != -1) {
2658                                 close(ctdb->recovery_lock_fd);
2659                                 ctdb->recovery_lock_fd = -1;
2660                         }
2661                 }
2662                 ctdb->tunable.verify_recovery_lock = 0;
2663                 talloc_free(tmp_ctx);
2664                 return 0;
2665         }
2666
2667         if (ctdb->recovery_lock_file == NULL) {
2668                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2669                 if (ctdb->recovery_lock_fd != -1) {
2670                         close(ctdb->recovery_lock_fd);
2671                         ctdb->recovery_lock_fd = -1;
2672                 }
2673                 talloc_free(tmp_ctx);
2674                 return 0;
2675         }
2676
2677
2678         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2679                 talloc_free(tmp_ctx);
2680                 return 0;
2681         }
2682
2683         talloc_free(ctdb->recovery_lock_file);
2684         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2685         ctdb->tunable.verify_recovery_lock = 0;
2686         if (ctdb->recovery_lock_fd != -1) {
2687                 close(ctdb->recovery_lock_fd);
2688                 ctdb->recovery_lock_fd = -1;
2689         }
2690
2691         talloc_free(tmp_ctx);
2692         return 0;
2693 }
2694                 
2695 /*
2696   the main monitoring loop
2697  */
2698 static void monitor_cluster(struct ctdb_context *ctdb)
2699 {
2700         uint32_t pnn;
2701         TALLOC_CTX *mem_ctx=NULL;
2702         struct ctdb_node_map *nodemap=NULL;
2703         struct ctdb_node_map *recmaster_nodemap=NULL;
2704         struct ctdb_node_map **remote_nodemaps=NULL;
2705         struct ctdb_vnn_map *vnnmap=NULL;
2706         struct ctdb_vnn_map *remote_vnnmap=NULL;
2707         int32_t debug_level;
2708         int i, j, ret;
2709         struct ctdb_recoverd *rec;
2710
2711         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2712
2713         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2714         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2715
2716         rec->ctdb = ctdb;
2717
2718         rec->priority_time = timeval_current();
2719
2720         /* register a message port for sending memory dumps */
2721         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2722
2723         /* register a message port for recovery elections */
2724         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2725
2726         /* when nodes are disabled/enabled */
2727         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2728
2729         /* when we are asked to puch out a flag change */
2730         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2731
2732         /* register a message port for vacuum fetch */
2733         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2734
2735         /* register a message port for reloadnodes  */
2736         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2737
2738         /* register a message port for performing a takeover run */
2739         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2740
2741         /* register a message port for disabling the ip check for a short while */
2742         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2743
2744 again:
2745         if (mem_ctx) {
2746                 talloc_free(mem_ctx);
2747                 mem_ctx = NULL;
2748         }
2749         mem_ctx = talloc_new(ctdb);
2750         if (!mem_ctx) {
2751                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2752                 exit(-1);
2753         }
2754
2755         /* we only check for recovery once every second */
2756         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2757
2758         /* verify that the main daemon is still running */
2759         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2760                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2761                 exit(-1);
2762         }
2763
2764         /* ping the local daemon to tell it we are alive */
2765         ctdb_ctrl_recd_ping(ctdb);
2766
2767         if (rec->election_timeout) {
2768                 /* an election is in progress */
2769                 goto again;
2770         }
2771
2772         /* read the debug level from the parent and update locally */
2773         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2774         if (ret !=0) {
2775                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2776                 goto again;
2777         }
2778         LogLevel = debug_level;
2779
2780
2781         /* We must check if we need to ban a node here but we want to do this
2782            as early as possible so we dont wait until we have pulled the node
2783            map from the local node. thats why we have the hardcoded value 20
2784         */
2785         for (i=0; i<ctdb->num_nodes; i++) {
2786                 struct ctdb_banning_state *ban_state;
2787
2788                 if (ctdb->nodes[i]->ban_state == NULL) {
2789                         continue;
2790                 }
2791                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2792                 if (ban_state->count < 20) {
2793                         continue;
2794                 }
2795                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2796                         ctdb->nodes[i]->pnn, ban_state->count,
2797                         ctdb->tunable.recovery_ban_period));
2798                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2799                 ban_state->count = 0;
2800         }
2801
2802         /* get relevant tunables */
2803         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2804         if (ret != 0) {
2805                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2806                 goto again;
2807         }
2808
2809         /* get the current recovery lock file from the server */
2810         if (update_recovery_lock_file(ctdb) != 0) {
2811                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2812                 goto again;
2813         }
2814
2815         /* Make sure that if recovery lock verification becomes disabled when
2816            we close the file
2817         */
2818         if (ctdb->tunable.verify_recovery_lock == 0) {
2819                 if (ctdb->recovery_lock_fd != -1) {
2820                         close(ctdb->recovery_lock_fd);
2821                         ctdb->recovery_lock_fd = -1;
2822                 }
2823         }
2824
2825         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2826         if (pnn == (uint32_t)-1) {
2827                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2828                 goto again;
2829         }
2830
2831         /* get the vnnmap */
2832         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2833         if (ret != 0) {
2834                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2835                 goto again;
2836         }
2837
2838
2839         /* get number of nodes */
2840         if (rec->nodemap) {
2841                 talloc_free(rec->nodemap);
2842                 rec->nodemap = NULL;
2843                 nodemap=NULL;
2844         }
2845         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2846         if (ret != 0) {
2847                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2848                 goto again;
2849         }
2850         nodemap = rec->nodemap;
2851
2852         /* check which node is the recovery master */
2853         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2854         if (ret != 0) {
2855                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2856                 goto again;
2857         }
2858
2859         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2860         if (rec->recmaster != pnn) {
2861                 if (rec->ip_reallocate_ctx != NULL) {
2862                         talloc_free(rec->ip_reallocate_ctx);
2863                         rec->ip_reallocate_ctx = NULL;
2864                         rec->reallocate_callers = NULL;
2865                 }
2866         }
2867         /* if there are takeovers requested, perform it and notify the waiters */
2868         if (rec->reallocate_callers) {
2869                 process_ipreallocate_requests(ctdb, rec);
2870         }
2871
2872         if (rec->recmaster == (uint32_t)-1) {
2873                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2874                 force_election(rec, pnn, nodemap);
2875                 goto again;
2876         }
2877
2878
2879         /* if the local daemon is STOPPED, we verify that the databases are
2880            also frozen and thet the recmode is set to active 
2881         */
2882         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2883                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2884                 if (ret != 0) {
2885                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2886                 }
2887                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2888                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2889
2890                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2891                         if (ret != 0) {
2892                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2893                                 goto again;
2894                         }
2895                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2896                         if (ret != 0) {
2897                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2898
2899                                 goto again;
2900                         }
2901                         goto again;
2902                 }
2903         }
2904         /* If the local node is stopped, verify we are not the recmaster 
2905            and yield this role if so
2906         */
2907         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2908                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2909                 force_election(rec, pnn, nodemap);
2910                 goto again;
2911         }
2912         
2913         /* check that we (recovery daemon) and the local ctdb daemon
2914            agrees on whether we are banned or not
2915         */
2916 //qqq
2917
2918         /* remember our own node flags */
2919         rec->node_flags = nodemap->nodes[pnn].flags;
2920
2921         /* count how many active nodes there are */
2922         rec->num_active    = 0;
2923         rec->num_connected = 0;
2924         for (i=0; i<nodemap->num; i++) {
2925                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2926                         rec->num_active++;
2927                 }
2928                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2929                         rec->num_connected++;
2930                 }
2931         }
2932
2933
2934         /* verify that the recmaster node is still active */
2935         for (j=0; j<nodemap->num; j++) {
2936                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2937                         break;
2938                 }
2939         }
2940
2941         if (j == nodemap->num) {
2942                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2943                 force_election(rec, pnn, nodemap);
2944                 goto again;
2945         }
2946
2947         /* if recovery master is disconnected we must elect a new recmaster */
2948         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2949                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2950                 force_election(rec, pnn, nodemap);
2951                 goto again;
2952         }
2953
2954         /* grap the nodemap from the recovery master to check if it is banned */
2955         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2956                                    mem_ctx, &recmaster_nodemap);
2957         if (ret != 0) {
2958                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2959                           nodemap->nodes[j].pnn));
2960                 goto again;
2961         }
2962
2963
2964         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2965                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2966                 force_election(rec, pnn, nodemap);
2967                 goto again;
2968         }
2969
2970
2971         /* verify that we have all ip addresses we should have and we dont
2972          * have addresses we shouldnt have.
2973          */ 
2974         if (ctdb->do_checkpublicip) {
2975                 if (rec->ip_check_disable_ctx == NULL) {
2976                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
2977                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2978                         }
2979                 }
2980         }
2981
2982
2983         /* if we are not the recmaster then we do not need to check
2984            if recovery is needed
2985          */
2986         if (pnn != rec->recmaster) {
2987                 goto again;
2988         }
2989
2990
2991         /* ensure our local copies of flags are right */
2992         ret = update_local_flags(rec, nodemap);
2993         if (ret == MONITOR_ELECTION_NEEDED) {
2994                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2995                 force_election(rec, pnn, nodemap);
2996                 goto again;
2997         }
2998         if (ret != MONITOR_OK) {
2999                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3000                 goto again;
3001         }
3002
3003         /* update the list of public ips that a node can handle for
3004            all connected nodes
3005         */
3006         if (ctdb->num_nodes != nodemap->num) {
3007                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3008                 reload_nodes_file(ctdb);
3009                 goto again;
3010         }
3011         for (j=0; j<nodemap->num; j++) {
3012                 /* release any existing data */
3013                 if (ctdb->nodes[j]->public_ips) {
3014                         talloc_free(ctdb->nodes[j]->public_ips);
3015                         ctdb->nodes[j]->public_ips = NULL;
3016                 }
3017
3018                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3019                         continue;
3020                 }
3021
3022                 /* grab a new shiny list of public ips from the node */
3023                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
3024                         ctdb->nodes[j]->pnn, 
3025                         ctdb->nodes,
3026                         &ctdb->nodes[j]->public_ips)) {
3027                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
3028                                 ctdb->nodes[j]->pnn));
3029                         goto again;
3030                 }
3031         }
3032
3033
3034         /* verify that all active nodes agree that we are the recmaster */
3035         switch (verify_recmaster(rec, nodemap, pnn)) {
3036         case MONITOR_RECOVERY_NEEDED:
3037                 /* can not happen */
3038                 goto again;
3039         case MONITOR_ELECTION_NEEDED:
3040                 force_election(rec, pnn, nodemap);
3041                 goto again;
3042         case MONITOR_OK:
3043                 break;
3044         case MONITOR_FAILED:
3045                 goto again;
3046         }
3047
3048
3049         if (rec->need_recovery) {
3050                 /* a previous recovery didn't finish */
3051                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3052                 goto again;             
3053         }
3054
3055         /* verify that all active nodes are in normal mode 
3056            and not in recovery mode 
3057         */
3058         switch (verify_recmode(ctdb, nodemap)) {
3059         case MONITOR_RECOVERY_NEEDED:
3060                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3061                 goto again;
3062         case MONITOR_FAILED:
3063                 goto again;
3064         case MONITOR_ELECTION_NEEDED:
3065                 /* can not happen */
3066         case MONITOR_OK:
3067                 break;
3068         }
3069
3070
3071         if (ctdb->tunable.verify_recovery_lock != 0) {
3072                 /* we should have the reclock - check its not stale */
3073                 ret = check_recovery_lock(ctdb);
3074                 if (ret != 0) {
3075                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3076                         ctdb_set_culprit(rec, ctdb->pnn);
3077                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3078                         goto again;
3079                 }
3080         }
3081
3082         /* get the nodemap for all active remote nodes
3083          */
3084         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3085         if (remote_nodemaps == NULL) {
3086                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3087                 goto again;
3088         }
3089         for(i=0; i<nodemap->num; i++) {
3090                 remote_nodemaps[i] = NULL;
3091         }
3092         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3093                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3094                 goto again;
3095         } 
3096
3097         /* verify that all other nodes have the same nodemap as we have
3098         */
3099         for (j=0; j<nodemap->num; j++) {
3100                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3101                         continue;
3102                 }
3103
3104                 if (remote_nodemaps[j] == NULL) {
3105                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3106                         ctdb_set_culprit(rec, j);
3107
3108                         goto again;
3109                 }
3110
3111                 /* if the nodes disagree on how many nodes there are
3112                    then this is a good reason to try recovery
3113                  */
3114                 if (remote_nodemaps[j]->num != nodemap->num) {
3115                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3116                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3117                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3118                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3119                         goto again;
3120                 }
3121
3122                 /* if the nodes disagree on which nodes exist and are
3123                    active, then that is also a good reason to do recovery
3124                  */
3125                 for (i=0;i<nodemap->num;i++) {
3126                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3127                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3128                                           nodemap->nodes[j].pnn, i, 
3129                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3130                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3131                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3132                                             vnnmap);
3133                                 goto again;
3134                         }
3135                 }
3136
3137                 /* verify the flags are consistent
3138                 */
3139                 for (i=0; i<nodemap->num; i++) {
3140                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3141                                 continue;
3142                         }
3143                         
3144                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3145                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3146                                   nodemap->nodes[j].pnn, 
3147                                   nodemap->nodes[i].pnn, 
3148                                   remote_nodemaps[j]->nodes[i].flags,
3149                                   nodemap->nodes[j].flags));
3150                                 if (i == j) {
3151                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3152                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3153                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3154                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3155                                                     vnnmap);
3156                                         goto again;
3157                                 } else {
3158                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3159                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3160                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3161                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3162                                                     vnnmap);
3163                                         goto again;
3164                                 }
3165                         }
3166                 }
3167         }
3168
3169
3170         /* there better be the same number of lmasters in the vnn map
3171            as there are active nodes or we will have to do a recovery
3172          */
3173         if (vnnmap->size != rec->num_active) {
3174                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3175                           vnnmap->size, rec->num_active));
3176                 ctdb_set_culprit(rec, ctdb->pnn);
3177                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3178                 goto again;
3179         }
3180
3181         /* verify that all active nodes in the nodemap also exist in 
3182            the vnnmap.
3183          */
3184         for (j=0; j<nodemap->num; j++) {
3185                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3186                         continue;
3187                 }
3188                 if (nodemap->nodes[j].pnn == pnn) {
3189                         continue;
3190                 }
3191
3192                 for (i=0; i<vnnmap->size; i++) {
3193                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3194                                 break;
3195                         }
3196                 }
3197                 if (i == vnnmap->size) {
3198                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3199                                   nodemap->nodes[j].pnn));
3200                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3201                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3202                         goto again;
3203                 }
3204         }
3205
3206         
3207         /* verify that all other nodes have the same vnnmap
3208            and are from the same generation
3209          */
3210         for (j=0; j<nodemap->num; j++) {
3211                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3212                         continue;
3213                 }
3214                 if (nodemap->nodes[j].pnn == pnn) {
3215                         continue;
3216                 }
3217
3218                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3219                                           mem_ctx, &remote_vnnmap);
3220                 if (ret != 0) {
3221                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3222                                   nodemap->nodes[j].pnn));
3223                         goto again;
3224                 }
3225
3226                 /* verify the vnnmap generation is the same */
3227                 if (vnnmap->generation != remote_vnnmap->generation) {
3228                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3229                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3230                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3231                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3232                         goto again;
3233                 }
3234
3235                 /* verify the vnnmap size is the same */
3236                 if (vnnmap->size != remote_vnnmap->size) {
3237                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3238                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3239                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3240                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3241                         goto again;
3242                 }
3243
3244                 /* verify the vnnmap is the same */
3245                 for (i=0;i<vnnmap->size;i++) {
3246                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3247                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3248                                           nodemap->nodes[j].pnn));
3249                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3250                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3251                                             vnnmap);
3252                                 goto again;
3253                         }
3254                 }
3255         }
3256
3257         /* we might need to change who has what IP assigned */
3258         if (rec->need_takeover_run) {
3259                 rec->need_takeover_run = false;
3260
3261                 /* execute the "startrecovery" event script on all nodes */
3262                 ret = run_startrecovery_eventscript(rec, nodemap);
3263                 if (ret!=0) {
3264                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3265                         ctdb_set_culprit(rec, ctdb->pnn);
3266                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3267                 }
3268
3269                 ret = ctdb_takeover_run(ctdb, nodemap);
3270                 if (ret != 0) {
3271                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3272                         ctdb_set_culprit(rec, ctdb->pnn);
3273                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3274                 }
3275
3276                 /* execute the "recovered" event script on all nodes */
3277                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3278 #if 0
3279 // we cant check whether the event completed successfully
3280 // since this script WILL fail if the node is in recovery mode
3281 // and if that race happens, the code here would just cause a second
3282 // cascading recovery.
3283                 if (ret!=0) {
3284                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3285                         ctdb_set_culprit(rec, ctdb->pnn);
3286                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3287                 }
3288 #endif
3289         }
3290
3291
3292         goto again;
3293
3294 }
3295
3296 /*
3297   event handler for when the main ctdbd dies
3298  */
3299 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3300                                  uint16_t flags, void *private_data)
3301 {
3302         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3303         _exit(1);
3304 }
3305
3306 /*
3307   called regularly to verify that the recovery daemon is still running
3308  */
3309 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3310                               struct timeval yt, void *p)
3311 {
3312         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3313
3314         if (kill(ctdb->recoverd_pid, 0) != 0) {
3315                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3316
3317                 ctdb_stop_recoverd(ctdb);
3318                 ctdb_stop_keepalive(ctdb);
3319                 ctdb_stop_monitoring(ctdb);
3320                 ctdb_release_all_ips(ctdb);
3321                 if (ctdb->methods != NULL) {
3322                         ctdb->methods->shutdown(ctdb);
3323                 }
3324                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3325
3326                 exit(10);       
3327         }
3328
3329         event_add_timed(ctdb->ev, ctdb, 
3330                         timeval_current_ofs(30, 0),
3331                         ctdb_check_recd, ctdb);
3332 }
3333
3334 static void recd_sig_child_handler(struct event_context *ev,
3335         struct signal_event *se, int signum, int count,
3336         void *dont_care, 
3337         void *private_data)
3338 {
3339 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3340         int status;
3341         pid_t pid = -1;
3342
3343         while (pid != 0) {
3344                 pid = waitpid(-1, &status, WNOHANG);
3345                 if (pid == -1) {
3346                         if (errno != ECHILD) {
3347                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3348                         }
3349                         return;
3350                 }
3351                 if (pid > 0) {
3352                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3353                 }
3354         }
3355 }
3356
3357 /*
3358   startup the recovery daemon as a child of the main ctdb daemon
3359  */
3360 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3361 {
3362         int fd[2];
3363         struct signal_event *se;
3364
3365         if (pipe(fd) != 0) {
3366                 return -1;
3367         }
3368
3369         ctdb->ctdbd_pid = getpid();
3370
3371         ctdb->recoverd_pid = fork();
3372         if (ctdb->recoverd_pid == -1) {
3373                 return -1;
3374         }
3375         
3376         if (ctdb->recoverd_pid != 0) {
3377                 close(fd[0]);
3378                 event_add_timed(ctdb->ev, ctdb, 
3379                                 timeval_current_ofs(30, 0),
3380                                 ctdb_check_recd, ctdb);
3381                 return 0;
3382         }
3383
3384         close(fd[1]);
3385
3386         srandom(getpid() ^ time(NULL));
3387
3388         if (switch_from_server_to_client(ctdb) != 0) {
3389                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3390                 exit(1);
3391         }
3392
3393         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3394
3395         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3396                      ctdb_recoverd_parent, &fd[0]);     
3397
3398         /* set up a handler to pick up sigchld */
3399         se = event_add_signal(ctdb->ev, ctdb,
3400                                      SIGCHLD, 0,
3401                                      recd_sig_child_handler,
3402                                      ctdb);
3403         if (se == NULL) {
3404                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3405                 exit(1);
3406         }
3407
3408         monitor_cluster(ctdb);
3409
3410         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3411         return -1;
3412 }
3413
3414 /*
3415   shutdown the recovery daemon
3416  */
3417 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3418 {
3419         if (ctdb->recoverd_pid == 0) {
3420                 return;
3421         }
3422
3423         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3424         kill(ctdb->recoverd_pid, SIGTERM);
3425 }