Merge commit 'origin/master'
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, CONTROL_TIMEOUT(),
236                                         false, tdb_null,
237                                         async_getcap_callback, NULL,
238                                         NULL) != 0) {
239                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
240                 talloc_free(tmp_ctx);
241                 return -1;
242         }
243
244         talloc_free(tmp_ctx);
245         return 0;
246 }
247
248 /*
249   change recovery mode on all nodes
250  */
251 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
252 {
253         TDB_DATA data;
254         uint32_t *nodes;
255         TALLOC_CTX *tmp_ctx;
256
257         tmp_ctx = talloc_new(ctdb);
258         CTDB_NO_MEMORY(ctdb, tmp_ctx);
259
260         /* freeze all nodes */
261         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
262         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
263                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
264                                                 nodes, CONTROL_TIMEOUT(),
265                                                 false, tdb_null,
266                                                 NULL, NULL,
267                                                 NULL) != 0) {
268                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
269                         talloc_free(tmp_ctx);
270                         return -1;
271                 }
272         }
273
274
275         data.dsize = sizeof(uint32_t);
276         data.dptr = (unsigned char *)&rec_mode;
277
278         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
279                                         nodes, CONTROL_TIMEOUT(),
280                                         false, data,
281                                         NULL, NULL,
282                                         NULL) != 0) {
283                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
284                 talloc_free(tmp_ctx);
285                 return -1;
286         }
287
288         talloc_free(tmp_ctx);
289         return 0;
290 }
291
292 /*
293   change recovery master on all node
294  */
295 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
296 {
297         TDB_DATA data;
298         TALLOC_CTX *tmp_ctx;
299         uint32_t *nodes;
300
301         tmp_ctx = talloc_new(ctdb);
302         CTDB_NO_MEMORY(ctdb, tmp_ctx);
303
304         data.dsize = sizeof(uint32_t);
305         data.dptr = (unsigned char *)&pnn;
306
307         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
308         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
309                                         nodes,
310                                         CONTROL_TIMEOUT(), false, data,
311                                         NULL, NULL,
312                                         NULL) != 0) {
313                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
314                 talloc_free(tmp_ctx);
315                 return -1;
316         }
317
318         talloc_free(tmp_ctx);
319         return 0;
320 }
321
322
323 /*
324   ensure all other nodes have attached to any databases that we have
325  */
326 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
327                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
328 {
329         int i, j, db, ret;
330         struct ctdb_dbid_map *remote_dbmap;
331
332         /* verify that all other nodes have all our databases */
333         for (j=0; j<nodemap->num; j++) {
334                 /* we dont need to ourself ourselves */
335                 if (nodemap->nodes[j].pnn == pnn) {
336                         continue;
337                 }
338                 /* dont check nodes that are unavailable */
339                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
340                         continue;
341                 }
342
343                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
344                                          mem_ctx, &remote_dbmap);
345                 if (ret != 0) {
346                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
347                         return -1;
348                 }
349
350                 /* step through all local databases */
351                 for (db=0; db<dbmap->num;db++) {
352                         const char *name;
353
354
355                         for (i=0;i<remote_dbmap->num;i++) {
356                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
357                                         break;
358                                 }
359                         }
360                         /* the remote node already have this database */
361                         if (i!=remote_dbmap->num) {
362                                 continue;
363                         }
364                         /* ok so we need to create this database */
365                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
366                                             mem_ctx, &name);
367                         if (ret != 0) {
368                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
369                                 return -1;
370                         }
371                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
372                                            mem_ctx, name, dbmap->dbs[db].persistent);
373                         if (ret != 0) {
374                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
375                                 return -1;
376                         }
377                 }
378         }
379
380         return 0;
381 }
382
383
384 /*
385   ensure we are attached to any databases that anyone else is attached to
386  */
387 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
388                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
389 {
390         int i, j, db, ret;
391         struct ctdb_dbid_map *remote_dbmap;
392
393         /* verify that we have all database any other node has */
394         for (j=0; j<nodemap->num; j++) {
395                 /* we dont need to ourself ourselves */
396                 if (nodemap->nodes[j].pnn == pnn) {
397                         continue;
398                 }
399                 /* dont check nodes that are unavailable */
400                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
401                         continue;
402                 }
403
404                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
405                                          mem_ctx, &remote_dbmap);
406                 if (ret != 0) {
407                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
408                         return -1;
409                 }
410
411                 /* step through all databases on the remote node */
412                 for (db=0; db<remote_dbmap->num;db++) {
413                         const char *name;
414
415                         for (i=0;i<(*dbmap)->num;i++) {
416                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
417                                         break;
418                                 }
419                         }
420                         /* we already have this db locally */
421                         if (i!=(*dbmap)->num) {
422                                 continue;
423                         }
424                         /* ok so we need to create this database and
425                            rebuild dbmap
426                          */
427                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
428                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
429                         if (ret != 0) {
430                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
431                                           nodemap->nodes[j].pnn));
432                                 return -1;
433                         }
434                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
435                                            remote_dbmap->dbs[db].persistent);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
438                                 return -1;
439                         }
440                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
443                                 return -1;
444                         }
445                 }
446         }
447
448         return 0;
449 }
450
451
452 /*
453   pull the remote database contents from one node into the recdb
454  */
455 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
456                                     struct tdb_wrap *recdb, uint32_t dbid)
457 {
458         int ret;
459         TDB_DATA outdata;
460         struct ctdb_marshall_buffer *reply;
461         struct ctdb_rec_data *rec;
462         int i;
463         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
464
465         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
466                                CONTROL_TIMEOUT(), &outdata);
467         if (ret != 0) {
468                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
469                 talloc_free(tmp_ctx);
470                 return -1;
471         }
472
473         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
474
475         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
476                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
477                 talloc_free(tmp_ctx);
478                 return -1;
479         }
480         
481         rec = (struct ctdb_rec_data *)&reply->data[0];
482         
483         for (i=0;
484              i<reply->count;
485              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
486                 TDB_DATA key, data;
487                 struct ctdb_ltdb_header *hdr;
488                 TDB_DATA existing;
489                 
490                 key.dptr = &rec->data[0];
491                 key.dsize = rec->keylen;
492                 data.dptr = &rec->data[key.dsize];
493                 data.dsize = rec->datalen;
494                 
495                 hdr = (struct ctdb_ltdb_header *)data.dptr;
496
497                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
498                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
499                         talloc_free(tmp_ctx);
500                         return -1;
501                 }
502
503                 /* fetch the existing record, if any */
504                 existing = tdb_fetch(recdb->tdb, key);
505                 
506                 if (existing.dptr != NULL) {
507                         struct ctdb_ltdb_header header;
508                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
509                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
510                                          (unsigned)existing.dsize, srcnode));
511                                 free(existing.dptr);
512                                 talloc_free(tmp_ctx);
513                                 return -1;
514                         }
515                         header = *(struct ctdb_ltdb_header *)existing.dptr;
516                         free(existing.dptr);
517                         if (!(header.rsn < hdr->rsn ||
518                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
519                                 continue;
520                         }
521                 }
522                 
523                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
524                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
525                         talloc_free(tmp_ctx);
526                         return -1;                              
527                 }
528         }
529
530         talloc_free(tmp_ctx);
531
532         return 0;
533 }
534
535 /*
536   pull all the remote database contents into the recdb
537  */
538 static int pull_remote_database(struct ctdb_context *ctdb,
539                                 struct ctdb_recoverd *rec, 
540                                 struct ctdb_node_map *nodemap, 
541                                 struct tdb_wrap *recdb, uint32_t dbid)
542 {
543         int j;
544
545         /* pull all records from all other nodes across onto this node
546            (this merges based on rsn)
547         */
548         for (j=0; j<nodemap->num; j++) {
549                 /* dont merge from nodes that are unavailable */
550                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
551                         continue;
552                 }
553                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
554                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
555                                  nodemap->nodes[j].pnn));
556                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
557                         return -1;
558                 }
559         }
560         
561         return 0;
562 }
563
564
565 /*
566   update flags on all active nodes
567  */
568 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
569 {
570         int ret;
571
572         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
573                 if (ret != 0) {
574                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
575                 return -1;
576         }
577
578         return 0;
579 }
580
581 /*
582   ensure all nodes have the same vnnmap we do
583  */
584 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
585                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
586 {
587         int j, ret;
588
589         /* push the new vnn map out to all the nodes */
590         for (j=0; j<nodemap->num; j++) {
591                 /* dont push to nodes that are unavailable */
592                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
593                         continue;
594                 }
595
596                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
597                 if (ret != 0) {
598                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
599                         return -1;
600                 }
601         }
602
603         return 0;
604 }
605
606
607 struct vacuum_info {
608         struct vacuum_info *next, *prev;
609         struct ctdb_recoverd *rec;
610         uint32_t srcnode;
611         struct ctdb_db_context *ctdb_db;
612         struct ctdb_marshall_buffer *recs;
613         struct ctdb_rec_data *r;
614 };
615
616 static void vacuum_fetch_next(struct vacuum_info *v);
617
618 /*
619   called when a vacuum fetch has completed - just free it and do the next one
620  */
621 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
622 {
623         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
624         talloc_free(state);
625         vacuum_fetch_next(v);
626 }
627
628
629 /*
630   process the next element from the vacuum list
631 */
632 static void vacuum_fetch_next(struct vacuum_info *v)
633 {
634         struct ctdb_call call;
635         struct ctdb_rec_data *r;
636
637         while (v->recs->count) {
638                 struct ctdb_client_call_state *state;
639                 TDB_DATA data;
640                 struct ctdb_ltdb_header *hdr;
641
642                 ZERO_STRUCT(call);
643                 call.call_id = CTDB_NULL_FUNC;
644                 call.flags = CTDB_IMMEDIATE_MIGRATION;
645
646                 r = v->r;
647                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
648                 v->recs->count--;
649
650                 call.key.dptr = &r->data[0];
651                 call.key.dsize = r->keylen;
652
653                 /* ensure we don't block this daemon - just skip a record if we can't get
654                    the chainlock */
655                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
656                         continue;
657                 }
658
659                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
660                 if (data.dptr == NULL) {
661                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
662                         continue;
663                 }
664
665                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
666                         free(data.dptr);
667                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
668                         continue;
669                 }
670                 
671                 hdr = (struct ctdb_ltdb_header *)data.dptr;
672                 if (hdr->dmaster == v->rec->ctdb->pnn) {
673                         /* its already local */
674                         free(data.dptr);
675                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
676                         continue;
677                 }
678
679                 free(data.dptr);
680
681                 state = ctdb_call_send(v->ctdb_db, &call);
682                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
683                 if (state == NULL) {
684                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
685                         talloc_free(v);
686                         return;
687                 }
688                 state->async.fn = vacuum_fetch_callback;
689                 state->async.private_data = v;
690                 return;
691         }
692
693         talloc_free(v);
694 }
695
696
697 /*
698   destroy a vacuum info structure
699  */
700 static int vacuum_info_destructor(struct vacuum_info *v)
701 {
702         DLIST_REMOVE(v->rec->vacuum_info, v);
703         return 0;
704 }
705
706
707 /*
708   handler for vacuum fetch
709 */
710 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
711                                  TDB_DATA data, void *private_data)
712 {
713         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
714         struct ctdb_marshall_buffer *recs;
715         int ret, i;
716         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
717         const char *name;
718         struct ctdb_dbid_map *dbmap=NULL;
719         bool persistent = false;
720         struct ctdb_db_context *ctdb_db;
721         struct ctdb_rec_data *r;
722         uint32_t srcnode;
723         struct vacuum_info *v;
724
725         recs = (struct ctdb_marshall_buffer *)data.dptr;
726         r = (struct ctdb_rec_data *)&recs->data[0];
727
728         if (recs->count == 0) {
729                 talloc_free(tmp_ctx);
730                 return;
731         }
732
733         srcnode = r->reqid;
734
735         for (v=rec->vacuum_info;v;v=v->next) {
736                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
737                         /* we're already working on records from this node */
738                         talloc_free(tmp_ctx);
739                         return;
740                 }
741         }
742
743         /* work out if the database is persistent */
744         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
745         if (ret != 0) {
746                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
747                 talloc_free(tmp_ctx);
748                 return;
749         }
750
751         for (i=0;i<dbmap->num;i++) {
752                 if (dbmap->dbs[i].dbid == recs->db_id) {
753                         persistent = dbmap->dbs[i].persistent;
754                         break;
755                 }
756         }
757         if (i == dbmap->num) {
758                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
759                 talloc_free(tmp_ctx);
760                 return;         
761         }
762
763         /* find the name of this database */
764         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
765                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
766                 talloc_free(tmp_ctx);
767                 return;
768         }
769
770         /* attach to it */
771         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
772         if (ctdb_db == NULL) {
773                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
774                 talloc_free(tmp_ctx);
775                 return;
776         }
777
778         v = talloc_zero(rec, struct vacuum_info);
779         if (v == NULL) {
780                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
781                 talloc_free(tmp_ctx);
782                 return;
783         }
784
785         v->rec = rec;
786         v->srcnode = srcnode;
787         v->ctdb_db = ctdb_db;
788         v->recs = talloc_memdup(v, recs, data.dsize);
789         if (v->recs == NULL) {
790                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
791                 talloc_free(v);
792                 talloc_free(tmp_ctx);
793                 return;         
794         }
795         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
796
797         DLIST_ADD(rec->vacuum_info, v);
798
799         talloc_set_destructor(v, vacuum_info_destructor);
800
801         vacuum_fetch_next(v);
802         talloc_free(tmp_ctx);
803 }
804
805
806 /*
807   called when ctdb_wait_timeout should finish
808  */
809 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
810                               struct timeval yt, void *p)
811 {
812         uint32_t *timed_out = (uint32_t *)p;
813         (*timed_out) = 1;
814 }
815
816 /*
817   wait for a given number of seconds
818  */
819 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
820 {
821         uint32_t timed_out = 0;
822         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
823         while (!timed_out) {
824                 event_loop_once(ctdb->ev);
825         }
826 }
827
828 /*
829   called when an election times out (ends)
830  */
831 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
832                                   struct timeval t, void *p)
833 {
834         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
835         rec->election_timeout = NULL;
836
837         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
838 }
839
840
841 /*
842   wait for an election to finish. It finished election_timeout seconds after
843   the last election packet is received
844  */
845 static void ctdb_wait_election(struct ctdb_recoverd *rec)
846 {
847         struct ctdb_context *ctdb = rec->ctdb;
848         while (rec->election_timeout) {
849                 event_loop_once(ctdb->ev);
850         }
851 }
852
853 /*
854   Update our local flags from all remote connected nodes. 
855   This is only run when we are or we belive we are the recovery master
856  */
857 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
858 {
859         int j;
860         struct ctdb_context *ctdb = rec->ctdb;
861         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
862
863         /* get the nodemap for all active remote nodes and verify
864            they are the same as for this node
865          */
866         for (j=0; j<nodemap->num; j++) {
867                 struct ctdb_node_map *remote_nodemap=NULL;
868                 int ret;
869
870                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
871                         continue;
872                 }
873                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
874                         continue;
875                 }
876
877                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
878                                            mem_ctx, &remote_nodemap);
879                 if (ret != 0) {
880                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
881                                   nodemap->nodes[j].pnn));
882                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
883                         talloc_free(mem_ctx);
884                         return MONITOR_FAILED;
885                 }
886                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
887                         /* We should tell our daemon about this so it
888                            updates its flags or else we will log the same 
889                            message again in the next iteration of recovery.
890                            Since we are the recovery master we can just as
891                            well update the flags on all nodes.
892                         */
893                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
894                         if (ret != 0) {
895                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
896                                 return -1;
897                         }
898
899                         /* Update our local copy of the flags in the recovery
900                            daemon.
901                         */
902                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
903                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
904                                  nodemap->nodes[j].flags));
905                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
906                 }
907                 talloc_free(remote_nodemap);
908         }
909         talloc_free(mem_ctx);
910         return MONITOR_OK;
911 }
912
913
914 /* Create a new random generation ip. 
915    The generation id can not be the INVALID_GENERATION id
916 */
917 static uint32_t new_generation(void)
918 {
919         uint32_t generation;
920
921         while (1) {
922                 generation = random();
923
924                 if (generation != INVALID_GENERATION) {
925                         break;
926                 }
927         }
928
929         return generation;
930 }
931
932
933 /*
934   create a temporary working database
935  */
936 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
937 {
938         char *name;
939         struct tdb_wrap *recdb;
940         unsigned tdb_flags;
941
942         /* open up the temporary recovery database */
943         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
944         if (name == NULL) {
945                 return NULL;
946         }
947         unlink(name);
948
949         tdb_flags = TDB_NOLOCK;
950         if (!ctdb->do_setsched) {
951                 tdb_flags |= TDB_NOMMAP;
952         }
953
954         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
955                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
956         if (recdb == NULL) {
957                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
958         }
959
960         talloc_free(name);
961
962         return recdb;
963 }
964
965
966 /* 
967    a traverse function for pulling all relevent records from recdb
968  */
969 struct recdb_data {
970         struct ctdb_context *ctdb;
971         struct ctdb_marshall_buffer *recdata;
972         uint32_t len;
973         bool failed;
974 };
975
976 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
977 {
978         struct recdb_data *params = (struct recdb_data *)p;
979         struct ctdb_rec_data *rec;
980         struct ctdb_ltdb_header *hdr;
981
982         /* skip empty records */
983         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
984                 return 0;
985         }
986
987         /* update the dmaster field to point to us */
988         hdr = (struct ctdb_ltdb_header *)data.dptr;
989         hdr->dmaster = params->ctdb->pnn;
990
991         /* add the record to the blob ready to send to the nodes */
992         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
993         if (rec == NULL) {
994                 params->failed = true;
995                 return -1;
996         }
997         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
998         if (params->recdata == NULL) {
999                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1000                          rec->length + params->len, params->recdata->count));
1001                 params->failed = true;
1002                 return -1;
1003         }
1004         params->recdata->count++;
1005         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1006         params->len += rec->length;
1007         talloc_free(rec);
1008
1009         return 0;
1010 }
1011
1012 /*
1013   push the recdb database out to all nodes
1014  */
1015 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1016                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1017 {
1018         struct recdb_data params;
1019         struct ctdb_marshall_buffer *recdata;
1020         TDB_DATA outdata;
1021         TALLOC_CTX *tmp_ctx;
1022         uint32_t *nodes;
1023
1024         tmp_ctx = talloc_new(ctdb);
1025         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1026
1027         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1028         CTDB_NO_MEMORY(ctdb, recdata);
1029
1030         recdata->db_id = dbid;
1031
1032         params.ctdb = ctdb;
1033         params.recdata = recdata;
1034         params.len = offsetof(struct ctdb_marshall_buffer, data);
1035         params.failed = false;
1036
1037         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1038                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1039                 talloc_free(params.recdata);
1040                 talloc_free(tmp_ctx);
1041                 return -1;
1042         }
1043
1044         if (params.failed) {
1045                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1046                 talloc_free(params.recdata);
1047                 talloc_free(tmp_ctx);
1048                 return -1;              
1049         }
1050
1051         recdata = params.recdata;
1052
1053         outdata.dptr = (void *)recdata;
1054         outdata.dsize = params.len;
1055
1056         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1057         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1058                                         nodes,
1059                                         CONTROL_TIMEOUT(), false, outdata,
1060                                         NULL, NULL,
1061                                         NULL) != 0) {
1062                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1063                 talloc_free(recdata);
1064                 talloc_free(tmp_ctx);
1065                 return -1;
1066         }
1067
1068         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1069                   dbid, recdata->count));
1070
1071         talloc_free(recdata);
1072         talloc_free(tmp_ctx);
1073
1074         return 0;
1075 }
1076
1077
1078 /*
1079   go through a full recovery on one database 
1080  */
1081 static int recover_database(struct ctdb_recoverd *rec, 
1082                             TALLOC_CTX *mem_ctx,
1083                             uint32_t dbid,
1084                             uint32_t pnn, 
1085                             struct ctdb_node_map *nodemap,
1086                             uint32_t transaction_id)
1087 {
1088         struct tdb_wrap *recdb;
1089         int ret;
1090         struct ctdb_context *ctdb = rec->ctdb;
1091         TDB_DATA data;
1092         struct ctdb_control_wipe_database w;
1093         uint32_t *nodes;
1094
1095         recdb = create_recdb(ctdb, mem_ctx);
1096         if (recdb == NULL) {
1097                 return -1;
1098         }
1099
1100         /* pull all remote databases onto the recdb */
1101         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1102         if (ret != 0) {
1103                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1104                 return -1;
1105         }
1106
1107         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1108
1109         /* wipe all the remote databases. This is safe as we are in a transaction */
1110         w.db_id = dbid;
1111         w.transaction_id = transaction_id;
1112
1113         data.dptr = (void *)&w;
1114         data.dsize = sizeof(w);
1115
1116         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1118                                         nodes,
1119                                         CONTROL_TIMEOUT(), false, data,
1120                                         NULL, NULL,
1121                                         NULL) != 0) {
1122                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1123                 talloc_free(recdb);
1124                 return -1;
1125         }
1126         
1127         /* push out the correct database. This sets the dmaster and skips 
1128            the empty records */
1129         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1130         if (ret != 0) {
1131                 talloc_free(recdb);
1132                 return -1;
1133         }
1134
1135         /* all done with this database */
1136         talloc_free(recdb);
1137
1138         return 0;
1139 }
1140
1141 /*
1142   reload the nodes file 
1143 */
1144 static void reload_nodes_file(struct ctdb_context *ctdb)
1145 {
1146         ctdb->nodes = NULL;
1147         ctdb_load_nodes_file(ctdb);
1148 }
1149
1150         
1151 /*
1152   we are the recmaster, and recovery is needed - start a recovery run
1153  */
1154 static int do_recovery(struct ctdb_recoverd *rec, 
1155                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1156                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1157 {
1158         struct ctdb_context *ctdb = rec->ctdb;
1159         int i, j, ret;
1160         uint32_t generation;
1161         struct ctdb_dbid_map *dbmap;
1162         TDB_DATA data;
1163         uint32_t *nodes;
1164         struct timeval start_time;
1165
1166         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1167
1168         /* if recovery fails, force it again */
1169         rec->need_recovery = true;
1170
1171         for (i=0; i<ctdb->num_nodes; i++) {
1172                 struct ctdb_banning_state *ban_state;
1173
1174                 if (ctdb->nodes[i]->ban_state == NULL) {
1175                         continue;
1176                 }
1177                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1178                 if (ban_state->count < 2*ctdb->num_nodes) {
1179                         continue;
1180                 }
1181                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1182                         ctdb->nodes[i]->pnn, ban_state->count,
1183                         ctdb->tunable.recovery_ban_period));
1184                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1185                 ban_state->count = 0;
1186         }
1187
1188
1189         if (ctdb->tunable.verify_recovery_lock != 0) {
1190                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1191                 start_time = timeval_current();
1192                 if (!ctdb_recovery_lock(ctdb, true)) {
1193                         ctdb_set_culprit(rec, pnn);
1194                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1195                         return -1;
1196                 }
1197                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1198                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1199         }
1200
1201         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1202
1203         /* get a list of all databases */
1204         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1205         if (ret != 0) {
1206                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1207                 return -1;
1208         }
1209
1210         /* we do the db creation before we set the recovery mode, so the freeze happens
1211            on all databases we will be dealing with. */
1212
1213         /* verify that we have all the databases any other node has */
1214         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1215         if (ret != 0) {
1216                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1217                 return -1;
1218         }
1219
1220         /* verify that all other nodes have all our databases */
1221         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1222         if (ret != 0) {
1223                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1224                 return -1;
1225         }
1226
1227         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1228
1229
1230         /* set recovery mode to active on all nodes */
1231         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1232         if (ret != 0) {
1233                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1234                 return -1;
1235         }
1236
1237         /* execute the "startrecovery" event script on all nodes */
1238         ret = run_startrecovery_eventscript(rec, nodemap);
1239         if (ret!=0) {
1240                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1241                 return -1;
1242         }
1243
1244         /* pick a new generation number */
1245         generation = new_generation();
1246
1247         /* change the vnnmap on this node to use the new generation 
1248            number but not on any other nodes.
1249            this guarantees that if we abort the recovery prematurely
1250            for some reason (a node stops responding?)
1251            that we can just return immediately and we will reenter
1252            recovery shortly again.
1253            I.e. we deliberately leave the cluster with an inconsistent
1254            generation id to allow us to abort recovery at any stage and
1255            just restart it from scratch.
1256          */
1257         vnnmap->generation = generation;
1258         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1259         if (ret != 0) {
1260                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1261                 return -1;
1262         }
1263
1264         data.dptr = (void *)&generation;
1265         data.dsize = sizeof(uint32_t);
1266
1267         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1268         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1269                                         nodes,
1270                                         CONTROL_TIMEOUT(), false, data,
1271                                         NULL, NULL,
1272                                         NULL) != 0) {
1273                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1274                 return -1;
1275         }
1276
1277         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1278
1279         for (i=0;i<dbmap->num;i++) {
1280                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1281                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1282                         return -1;
1283                 }
1284         }
1285
1286         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1287
1288         /* commit all the changes */
1289         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1290                                         nodes,
1291                                         CONTROL_TIMEOUT(), false, data,
1292                                         NULL, NULL,
1293                                         NULL) != 0) {
1294                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1295                 return -1;
1296         }
1297
1298         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1299         
1300
1301         /* update the capabilities for all nodes */
1302         ret = update_capabilities(ctdb, nodemap);
1303         if (ret!=0) {
1304                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1305                 return -1;
1306         }
1307
1308         /* build a new vnn map with all the currently active and
1309            unbanned nodes */
1310         generation = new_generation();
1311         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1312         CTDB_NO_MEMORY(ctdb, vnnmap);
1313         vnnmap->generation = generation;
1314         vnnmap->size = 0;
1315         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1316         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1317         for (i=j=0;i<nodemap->num;i++) {
1318                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1319                         continue;
1320                 }
1321                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1322                         /* this node can not be an lmaster */
1323                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1324                         continue;
1325                 }
1326
1327                 vnnmap->size++;
1328                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1329                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1330                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1331
1332         }
1333         if (vnnmap->size == 0) {
1334                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1335                 vnnmap->size++;
1336                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1337                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1338                 vnnmap->map[0] = pnn;
1339         }       
1340
1341         /* update to the new vnnmap on all nodes */
1342         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1343         if (ret != 0) {
1344                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1345                 return -1;
1346         }
1347
1348         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1349
1350         /* update recmaster to point to us for all nodes */
1351         ret = set_recovery_master(ctdb, nodemap, pnn);
1352         if (ret!=0) {
1353                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1354                 return -1;
1355         }
1356
1357         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1358
1359         /*
1360           update all nodes to have the same flags that we have
1361          */
1362         for (i=0;i<nodemap->num;i++) {
1363                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1364                         continue;
1365                 }
1366
1367                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1368                 if (ret != 0) {
1369                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1370                         return -1;
1371                 }
1372         }
1373
1374         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1375
1376         /* disable recovery mode */
1377         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1380                 return -1;
1381         }
1382
1383         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1384
1385         /*
1386           tell nodes to takeover their public IPs
1387          */
1388         rec->need_takeover_run = false;
1389         ret = ctdb_takeover_run(ctdb, nodemap);
1390         if (ret != 0) {
1391                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1392                 return -1;
1393         }
1394         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1395
1396         /* execute the "recovered" event script on all nodes */
1397         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1398         if (ret!=0) {
1399                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1400                 return -1;
1401         }
1402
1403         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1404
1405         /* send a message to all clients telling them that the cluster 
1406            has been reconfigured */
1407         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1408
1409         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1410
1411         rec->need_recovery = false;
1412
1413         /* we managed to complete a full recovery, make sure to forgive
1414            any past sins by the nodes that could now participate in the
1415            recovery.
1416         */
1417         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1418         for (i=0;i<nodemap->num;i++) {
1419                 struct ctdb_banning_state *ban_state;
1420
1421                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1422                         continue;
1423                 }
1424
1425                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1426                 if (ban_state == NULL) {
1427                         continue;
1428                 }
1429
1430                 ban_state->count = 0;
1431         }
1432
1433
1434         /* We just finished a recovery successfully. 
1435            We now wait for rerecovery_timeout before we allow 
1436            another recovery to take place.
1437         */
1438         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1439         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1440         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1441
1442         return 0;
1443 }
1444
1445
1446 /*
1447   elections are won by first checking the number of connected nodes, then
1448   the priority time, then the pnn
1449  */
1450 struct election_message {
1451         uint32_t num_connected;
1452         struct timeval priority_time;
1453         uint32_t pnn;
1454         uint32_t node_flags;
1455 };
1456
1457 /*
1458   form this nodes election data
1459  */
1460 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1461 {
1462         int ret, i;
1463         struct ctdb_node_map *nodemap;
1464         struct ctdb_context *ctdb = rec->ctdb;
1465
1466         ZERO_STRUCTP(em);
1467
1468         em->pnn = rec->ctdb->pnn;
1469         em->priority_time = rec->priority_time;
1470
1471         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1472         if (ret != 0) {
1473                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1474                 return;
1475         }
1476
1477         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1478         em->node_flags = rec->node_flags;
1479
1480         for (i=0;i<nodemap->num;i++) {
1481                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1482                         em->num_connected++;
1483                 }
1484         }
1485
1486         /* we shouldnt try to win this election if we cant be a recmaster */
1487         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1488                 em->num_connected = 0;
1489                 em->priority_time = timeval_current();
1490         }
1491
1492         talloc_free(nodemap);
1493 }
1494
1495 /*
1496   see if the given election data wins
1497  */
1498 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1499 {
1500         struct election_message myem;
1501         int cmp = 0;
1502
1503         ctdb_election_data(rec, &myem);
1504
1505         /* we cant win if we dont have the recmaster capability */
1506         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1507                 return false;
1508         }
1509
1510         /* we cant win if we are banned */
1511         if (rec->node_flags & NODE_FLAGS_BANNED) {
1512                 return false;
1513         }       
1514
1515         /* we cant win if we are stopped */
1516         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1517                 return false;
1518         }       
1519
1520         /* we will automatically win if the other node is banned */
1521         if (em->node_flags & NODE_FLAGS_BANNED) {
1522                 return true;
1523         }
1524
1525         /* we will automatically win if the other node is banned */
1526         if (em->node_flags & NODE_FLAGS_STOPPED) {
1527                 return true;
1528         }
1529
1530         /* try to use the most connected node */
1531         if (cmp == 0) {
1532                 cmp = (int)myem.num_connected - (int)em->num_connected;
1533         }
1534
1535         /* then the longest running node */
1536         if (cmp == 0) {
1537                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1538         }
1539
1540         if (cmp == 0) {
1541                 cmp = (int)myem.pnn - (int)em->pnn;
1542         }
1543
1544         return cmp > 0;
1545 }
1546
1547 /*
1548   send out an election request
1549  */
1550 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1551 {
1552         int ret;
1553         TDB_DATA election_data;
1554         struct election_message emsg;
1555         uint64_t srvid;
1556         struct ctdb_context *ctdb = rec->ctdb;
1557
1558         srvid = CTDB_SRVID_RECOVERY;
1559
1560         ctdb_election_data(rec, &emsg);
1561
1562         election_data.dsize = sizeof(struct election_message);
1563         election_data.dptr  = (unsigned char *)&emsg;
1564
1565
1566         /* send an election message to all active nodes */
1567         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1568         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1569
1570
1571         /* A new node that is already frozen has entered the cluster.
1572            The existing nodes are not frozen and dont need to be frozen
1573            until the election has ended and we start the actual recovery
1574         */
1575         if (update_recmaster == true) {
1576                 /* first we assume we will win the election and set 
1577                    recoverymaster to be ourself on the current node
1578                  */
1579                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1580                 if (ret != 0) {
1581                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1582                         return -1;
1583                 }
1584         }
1585
1586
1587         return 0;
1588 }
1589
1590 /*
1591   this function will unban all nodes in the cluster
1592 */
1593 static void unban_all_nodes(struct ctdb_context *ctdb)
1594 {
1595         int ret, i;
1596         struct ctdb_node_map *nodemap;
1597         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1598         
1599         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1600         if (ret != 0) {
1601                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1602                 return;
1603         }
1604
1605         for (i=0;i<nodemap->num;i++) {
1606                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1607                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1608                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1609                 }
1610         }
1611
1612         talloc_free(tmp_ctx);
1613 }
1614
1615
1616 /*
1617   we think we are winning the election - send a broadcast election request
1618  */
1619 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1620 {
1621         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1622         int ret;
1623
1624         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1625         if (ret != 0) {
1626                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1627         }
1628
1629         talloc_free(rec->send_election_te);
1630         rec->send_election_te = NULL;
1631 }
1632
1633 /*
1634   handler for memory dumps
1635 */
1636 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1637                              TDB_DATA data, void *private_data)
1638 {
1639         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1640         TDB_DATA *dump;
1641         int ret;
1642         struct rd_memdump_reply *rd;
1643
1644         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1645                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1646                 talloc_free(tmp_ctx);
1647                 return;
1648         }
1649         rd = (struct rd_memdump_reply *)data.dptr;
1650
1651         dump = talloc_zero(tmp_ctx, TDB_DATA);
1652         if (dump == NULL) {
1653                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1654                 talloc_free(tmp_ctx);
1655                 return;
1656         }
1657         ret = ctdb_dump_memory(ctdb, dump);
1658         if (ret != 0) {
1659                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1660                 talloc_free(tmp_ctx);
1661                 return;
1662         }
1663
1664 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1665
1666         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1667         if (ret != 0) {
1668                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1669                 talloc_free(tmp_ctx);
1670                 return;
1671         }
1672
1673         talloc_free(tmp_ctx);
1674 }
1675
1676 /*
1677   handler for reload_nodes
1678 */
1679 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1680                              TDB_DATA data, void *private_data)
1681 {
1682         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1683
1684         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1685
1686         reload_nodes_file(rec->ctdb);
1687 }
1688
1689
1690 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1691                               struct timeval yt, void *p)
1692 {
1693         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1694
1695         talloc_free(rec->ip_check_disable_ctx);
1696         rec->ip_check_disable_ctx = NULL;
1697 }
1698
1699 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1700                              TDB_DATA data, void *private_data)
1701 {
1702         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1703         uint32_t timeout;
1704
1705         if (rec->ip_check_disable_ctx != NULL) {
1706                 talloc_free(rec->ip_check_disable_ctx);
1707                 rec->ip_check_disable_ctx = NULL;
1708         }
1709
1710         if (data.dsize != sizeof(uint32_t)) {
1711                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu expexting %lu\n", data.dsize, sizeof(uint32_t)));
1712                 return;
1713         }
1714         if (data.dptr == NULL) {
1715                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1716                 return;
1717         }
1718
1719         timeout = *((uint32_t *)data.dptr);
1720         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1721
1722         rec->ip_check_disable_ctx = talloc_new(rec);
1723         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1724
1725         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1726 }
1727
1728
1729 /*
1730   handler for ip reallocate, just add it to the list of callers and 
1731   handle this later in the monitor_cluster loop so we do not recurse
1732   with other callers to takeover_run()
1733 */
1734 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1735                              TDB_DATA data, void *private_data)
1736 {
1737         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1738         struct ip_reallocate_list *caller;
1739
1740         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1741                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1742                 return;
1743         }
1744
1745         if (rec->ip_reallocate_ctx == NULL) {
1746                 rec->ip_reallocate_ctx = talloc_new(rec);
1747                 CTDB_NO_MEMORY_FATAL(ctdb, caller);
1748         }
1749
1750         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1751         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1752
1753         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1754         caller->next = rec->reallocate_callers;
1755         rec->reallocate_callers = caller;
1756
1757         return;
1758 }
1759
1760 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1761 {
1762         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1763         TDB_DATA result;
1764         int32_t ret;
1765         struct ip_reallocate_list *callers;
1766
1767         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1768         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1769         result.dsize = sizeof(int32_t);
1770         result.dptr  = (uint8_t *)&ret;
1771
1772         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1773                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1774                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1775                 if (ret != 0) {
1776                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1777                 }
1778         }
1779
1780         talloc_free(tmp_ctx);
1781         talloc_free(rec->ip_reallocate_ctx);
1782         rec->ip_reallocate_ctx = NULL;
1783         rec->reallocate_callers = NULL;
1784         
1785 }
1786
1787
1788 /*
1789   handler for recovery master elections
1790 */
1791 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1792                              TDB_DATA data, void *private_data)
1793 {
1794         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1795         int ret;
1796         struct election_message *em = (struct election_message *)data.dptr;
1797         TALLOC_CTX *mem_ctx;
1798
1799         /* we got an election packet - update the timeout for the election */
1800         talloc_free(rec->election_timeout);
1801         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1802                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1803                                                 ctdb_election_timeout, rec);
1804
1805         mem_ctx = talloc_new(ctdb);
1806
1807         /* someone called an election. check their election data
1808            and if we disagree and we would rather be the elected node, 
1809            send a new election message to all other nodes
1810          */
1811         if (ctdb_election_win(rec, em)) {
1812                 if (!rec->send_election_te) {
1813                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1814                                                                 timeval_current_ofs(0, 500000),
1815                                                                 election_send_request, rec);
1816                 }
1817                 talloc_free(mem_ctx);
1818                 /*unban_all_nodes(ctdb);*/
1819                 return;
1820         }
1821         
1822         /* we didn't win */
1823         talloc_free(rec->send_election_te);
1824         rec->send_election_te = NULL;
1825
1826         if (ctdb->tunable.verify_recovery_lock != 0) {
1827                 /* release the recmaster lock */
1828                 if (em->pnn != ctdb->pnn &&
1829                     ctdb->recovery_lock_fd != -1) {
1830                         close(ctdb->recovery_lock_fd);
1831                         ctdb->recovery_lock_fd = -1;
1832                         unban_all_nodes(ctdb);
1833                 }
1834         }
1835
1836         /* ok, let that guy become recmaster then */
1837         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1838         if (ret != 0) {
1839                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1840                 talloc_free(mem_ctx);
1841                 return;
1842         }
1843
1844         talloc_free(mem_ctx);
1845         return;
1846 }
1847
1848
1849 /*
1850   force the start of the election process
1851  */
1852 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1853                            struct ctdb_node_map *nodemap)
1854 {
1855         int ret;
1856         struct ctdb_context *ctdb = rec->ctdb;
1857
1858         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1859
1860         /* set all nodes to recovery mode to stop all internode traffic */
1861         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1862         if (ret != 0) {
1863                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1864                 return;
1865         }
1866
1867         talloc_free(rec->election_timeout);
1868         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1869                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1870                                                 ctdb_election_timeout, rec);
1871
1872         ret = send_election_request(rec, pnn, true);
1873         if (ret!=0) {
1874                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1875                 return;
1876         }
1877
1878         /* wait for a few seconds to collect all responses */
1879         ctdb_wait_election(rec);
1880 }
1881
1882
1883
1884 /*
1885   handler for when a node changes its flags
1886 */
1887 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1888                             TDB_DATA data, void *private_data)
1889 {
1890         int ret;
1891         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1892         struct ctdb_node_map *nodemap=NULL;
1893         TALLOC_CTX *tmp_ctx;
1894         uint32_t changed_flags;
1895         int i;
1896         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1897
1898         if (data.dsize != sizeof(*c)) {
1899                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1900                 return;
1901         }
1902
1903         tmp_ctx = talloc_new(ctdb);
1904         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1905
1906         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1907         if (ret != 0) {
1908                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1909                 talloc_free(tmp_ctx);
1910                 return;         
1911         }
1912
1913
1914         for (i=0;i<nodemap->num;i++) {
1915                 if (nodemap->nodes[i].pnn == c->pnn) break;
1916         }
1917
1918         if (i == nodemap->num) {
1919                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1920                 talloc_free(tmp_ctx);
1921                 return;
1922         }
1923
1924         changed_flags = c->old_flags ^ c->new_flags;
1925
1926         if (nodemap->nodes[i].flags != c->new_flags) {
1927                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1928         }
1929
1930         nodemap->nodes[i].flags = c->new_flags;
1931
1932         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1933                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1934
1935         if (ret == 0) {
1936                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1937                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1938         }
1939         
1940         if (ret == 0 &&
1941             ctdb->recovery_master == ctdb->pnn &&
1942             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1943                 /* Only do the takeover run if the perm disabled or unhealthy
1944                    flags changed since these will cause an ip failover but not
1945                    a recovery.
1946                    If the node became disconnected or banned this will also
1947                    lead to an ip address failover but that is handled 
1948                    during recovery
1949                 */
1950                 if (changed_flags & NODE_FLAGS_DISABLED) {
1951                         rec->need_takeover_run = true;
1952                 }
1953         }
1954
1955         talloc_free(tmp_ctx);
1956 }
1957
1958 /*
1959   handler for when we need to push out flag changes ot all other nodes
1960 */
1961 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1962                             TDB_DATA data, void *private_data)
1963 {
1964         int ret;
1965         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1966
1967         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
1968         if (ret != 0) {
1969                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1970         }
1971 }
1972
1973
1974 struct verify_recmode_normal_data {
1975         uint32_t count;
1976         enum monitor_result status;
1977 };
1978
1979 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1980 {
1981         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1982
1983
1984         /* one more node has responded with recmode data*/
1985         rmdata->count--;
1986
1987         /* if we failed to get the recmode, then return an error and let
1988            the main loop try again.
1989         */
1990         if (state->state != CTDB_CONTROL_DONE) {
1991                 if (rmdata->status == MONITOR_OK) {
1992                         rmdata->status = MONITOR_FAILED;
1993                 }
1994                 return;
1995         }
1996
1997         /* if we got a response, then the recmode will be stored in the
1998            status field
1999         */
2000         if (state->status != CTDB_RECOVERY_NORMAL) {
2001                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2002                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2003         }
2004
2005         return;
2006 }
2007
2008
2009 /* verify that all nodes are in normal recovery mode */
2010 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2011 {
2012         struct verify_recmode_normal_data *rmdata;
2013         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2014         struct ctdb_client_control_state *state;
2015         enum monitor_result status;
2016         int j;
2017         
2018         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2019         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2020         rmdata->count  = 0;
2021         rmdata->status = MONITOR_OK;
2022
2023         /* loop over all active nodes and send an async getrecmode call to 
2024            them*/
2025         for (j=0; j<nodemap->num; j++) {
2026                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2027                         continue;
2028                 }
2029                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2030                                         CONTROL_TIMEOUT(), 
2031                                         nodemap->nodes[j].pnn);
2032                 if (state == NULL) {
2033                         /* we failed to send the control, treat this as 
2034                            an error and try again next iteration
2035                         */                      
2036                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2037                         talloc_free(mem_ctx);
2038                         return MONITOR_FAILED;
2039                 }
2040
2041                 /* set up the callback functions */
2042                 state->async.fn = verify_recmode_normal_callback;
2043                 state->async.private_data = rmdata;
2044
2045                 /* one more control to wait for to complete */
2046                 rmdata->count++;
2047         }
2048
2049
2050         /* now wait for up to the maximum number of seconds allowed
2051            or until all nodes we expect a response from has replied
2052         */
2053         while (rmdata->count > 0) {
2054                 event_loop_once(ctdb->ev);
2055         }
2056
2057         status = rmdata->status;
2058         talloc_free(mem_ctx);
2059         return status;
2060 }
2061
2062
2063 struct verify_recmaster_data {
2064         struct ctdb_recoverd *rec;
2065         uint32_t count;
2066         uint32_t pnn;
2067         enum monitor_result status;
2068 };
2069
2070 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2071 {
2072         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2073
2074
2075         /* one more node has responded with recmaster data*/
2076         rmdata->count--;
2077
2078         /* if we failed to get the recmaster, then return an error and let
2079            the main loop try again.
2080         */
2081         if (state->state != CTDB_CONTROL_DONE) {
2082                 if (rmdata->status == MONITOR_OK) {
2083                         rmdata->status = MONITOR_FAILED;
2084                 }
2085                 return;
2086         }
2087
2088         /* if we got a response, then the recmaster will be stored in the
2089            status field
2090         */
2091         if (state->status != rmdata->pnn) {
2092                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2093                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2094                 rmdata->status = MONITOR_ELECTION_NEEDED;
2095         }
2096
2097         return;
2098 }
2099
2100
2101 /* verify that all nodes agree that we are the recmaster */
2102 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2103 {
2104         struct ctdb_context *ctdb = rec->ctdb;
2105         struct verify_recmaster_data *rmdata;
2106         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2107         struct ctdb_client_control_state *state;
2108         enum monitor_result status;
2109         int j;
2110         
2111         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2112         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2113         rmdata->rec    = rec;
2114         rmdata->count  = 0;
2115         rmdata->pnn    = pnn;
2116         rmdata->status = MONITOR_OK;
2117
2118         /* loop over all active nodes and send an async getrecmaster call to 
2119            them*/
2120         for (j=0; j<nodemap->num; j++) {
2121                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2122                         continue;
2123                 }
2124                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2125                                         CONTROL_TIMEOUT(),
2126                                         nodemap->nodes[j].pnn);
2127                 if (state == NULL) {
2128                         /* we failed to send the control, treat this as 
2129                            an error and try again next iteration
2130                         */                      
2131                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2132                         talloc_free(mem_ctx);
2133                         return MONITOR_FAILED;
2134                 }
2135
2136                 /* set up the callback functions */
2137                 state->async.fn = verify_recmaster_callback;
2138                 state->async.private_data = rmdata;
2139
2140                 /* one more control to wait for to complete */
2141                 rmdata->count++;
2142         }
2143
2144
2145         /* now wait for up to the maximum number of seconds allowed
2146            or until all nodes we expect a response from has replied
2147         */
2148         while (rmdata->count > 0) {
2149                 event_loop_once(ctdb->ev);
2150         }
2151
2152         status = rmdata->status;
2153         talloc_free(mem_ctx);
2154         return status;
2155 }
2156
2157
2158 /* called to check that the allocation of public ip addresses is ok.
2159 */
2160 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2161 {
2162         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2163         struct ctdb_all_public_ips *ips = NULL;
2164         struct ctdb_uptime *uptime1 = NULL;
2165         struct ctdb_uptime *uptime2 = NULL;
2166         int ret, j;
2167
2168         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2169                                 CTDB_CURRENT_NODE, &uptime1);
2170         if (ret != 0) {
2171                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2172                 talloc_free(mem_ctx);
2173                 return -1;
2174         }
2175
2176         /* read the ip allocation from the local node */
2177         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2178         if (ret != 0) {
2179                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2180                 talloc_free(mem_ctx);
2181                 return -1;
2182         }
2183
2184         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2185                                 CTDB_CURRENT_NODE, &uptime2);
2186         if (ret != 0) {
2187                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2188                 talloc_free(mem_ctx);
2189                 return -1;
2190         }
2191
2192         /* skip the check if the startrecovery time has changed */
2193         if (timeval_compare(&uptime1->last_recovery_started,
2194                             &uptime2->last_recovery_started) != 0) {
2195                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2196                 talloc_free(mem_ctx);
2197                 return 0;
2198         }
2199
2200         /* skip the check if the endrecovery time has changed */
2201         if (timeval_compare(&uptime1->last_recovery_finished,
2202                             &uptime2->last_recovery_finished) != 0) {
2203                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2204                 talloc_free(mem_ctx);
2205                 return 0;
2206         }
2207
2208         /* skip the check if we have started but not finished recovery */
2209         if (timeval_compare(&uptime1->last_recovery_finished,
2210                             &uptime1->last_recovery_started) != 1) {
2211                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2212                 talloc_free(mem_ctx);
2213
2214                 return 0;
2215         }
2216
2217         /* verify that we have the ip addresses we should have
2218            and we dont have ones we shouldnt have.
2219            if we find an inconsistency we set recmode to
2220            active on the local node and wait for the recmaster
2221            to do a full blown recovery
2222         */
2223         for (j=0; j<ips->num; j++) {
2224                 if (ips->ips[j].pnn == pnn) {
2225                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2226                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2227                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2228                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2229                                 if (ret != 0) {
2230                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2231
2232                                         talloc_free(mem_ctx);
2233                                         return -1;
2234                                 }
2235                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2236                                 if (ret != 0) {
2237                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2238
2239                                         talloc_free(mem_ctx);
2240                                         return -1;
2241                                 }
2242                         }
2243                 } else {
2244                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2245                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2246                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2247
2248                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2249                                 if (ret != 0) {
2250                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2251
2252                                         talloc_free(mem_ctx);
2253                                         return -1;
2254                                 }
2255                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2256                                 if (ret != 0) {
2257                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2258
2259                                         talloc_free(mem_ctx);
2260                                         return -1;
2261                                 }
2262                         }
2263                 }
2264         }
2265
2266         talloc_free(mem_ctx);
2267         return 0;
2268 }
2269
2270
2271 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2272 {
2273         struct ctdb_node_map **remote_nodemaps = callback_data;
2274
2275         if (node_pnn >= ctdb->num_nodes) {
2276                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2277                 return;
2278         }
2279
2280         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2281
2282 }
2283
2284 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2285         struct ctdb_node_map *nodemap,
2286         struct ctdb_node_map **remote_nodemaps)
2287 {
2288         uint32_t *nodes;
2289
2290         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2291         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2292                                         nodes,
2293                                         CONTROL_TIMEOUT(), false, tdb_null,
2294                                         async_getnodemap_callback,
2295                                         NULL,
2296                                         remote_nodemaps) != 0) {
2297                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2298
2299                 return -1;
2300         }
2301
2302         return 0;
2303 }
2304
2305 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2306 struct ctdb_check_reclock_state {
2307         struct ctdb_context *ctdb;
2308         struct timeval start_time;
2309         int fd[2];
2310         pid_t child;
2311         struct timed_event *te;
2312         struct fd_event *fde;
2313         enum reclock_child_status status;
2314 };
2315
2316 /* when we free the reclock state we must kill any child process.
2317 */
2318 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2319 {
2320         struct ctdb_context *ctdb = state->ctdb;
2321
2322         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2323
2324         if (state->fd[0] != -1) {
2325                 close(state->fd[0]);
2326                 state->fd[0] = -1;
2327         }
2328         if (state->fd[1] != -1) {
2329                 close(state->fd[1]);
2330                 state->fd[1] = -1;
2331         }
2332         kill(state->child, SIGKILL);
2333         return 0;
2334 }
2335
2336 /*
2337   called if our check_reclock child times out. this would happen if
2338   i/o to the reclock file blocks.
2339  */
2340 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2341                                          struct timeval t, void *private_data)
2342 {
2343         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2344                                            struct ctdb_check_reclock_state);
2345
2346         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2347         state->status = RECLOCK_TIMEOUT;
2348 }
2349
2350 /* this is called when the child process has completed checking the reclock
2351    file and has written data back to us through the pipe.
2352 */
2353 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2354                              uint16_t flags, void *private_data)
2355 {
2356         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2357                                              struct ctdb_check_reclock_state);
2358         char c = 0;
2359         int ret;
2360
2361         /* we got a response from our child process so we can abort the
2362            timeout.
2363         */
2364         talloc_free(state->te);
2365         state->te = NULL;
2366
2367         ret = read(state->fd[0], &c, 1);
2368         if (ret != 1 || c != RECLOCK_OK) {
2369                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2370                 state->status = RECLOCK_FAILED;
2371
2372                 return;
2373         }
2374
2375         state->status = RECLOCK_OK;
2376         return;
2377 }
2378
2379 static int check_recovery_lock(struct ctdb_context *ctdb)
2380 {
2381         int ret;
2382         struct ctdb_check_reclock_state *state;
2383         pid_t parent = getpid();
2384
2385         if (ctdb->recovery_lock_fd == -1) {
2386                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2387                 return -1;
2388         }
2389
2390         state = talloc(ctdb, struct ctdb_check_reclock_state);
2391         CTDB_NO_MEMORY(ctdb, state);
2392
2393         state->ctdb = ctdb;
2394         state->start_time = timeval_current();
2395         state->status = RECLOCK_CHECKING;
2396         state->fd[0] = -1;
2397         state->fd[1] = -1;
2398
2399         ret = pipe(state->fd);
2400         if (ret != 0) {
2401                 talloc_free(state);
2402                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2403                 return -1;
2404         }
2405
2406         state->child = fork();
2407         if (state->child == (pid_t)-1) {
2408                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2409                 close(state->fd[0]);
2410                 state->fd[0] = -1;
2411                 close(state->fd[1]);
2412                 state->fd[1] = -1;
2413                 talloc_free(state);
2414                 return -1;
2415         }
2416
2417         if (state->child == 0) {
2418                 char cc = RECLOCK_OK;
2419                 close(state->fd[0]);
2420                 state->fd[0] = -1;
2421
2422                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2423                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2424                         cc = RECLOCK_FAILED;
2425                 }
2426
2427                 write(state->fd[1], &cc, 1);
2428                 /* make sure we die when our parent dies */
2429                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2430                         sleep(5);
2431                         write(state->fd[1], &cc, 1);
2432                 }
2433                 _exit(0);
2434         }
2435         close(state->fd[1]);
2436         state->fd[1] = -1;
2437
2438         talloc_set_destructor(state, check_reclock_destructor);
2439
2440         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2441                                     ctdb_check_reclock_timeout, state);
2442         if (state->te == NULL) {
2443                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2444                 talloc_free(state);
2445                 return -1;
2446         }
2447
2448         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2449                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2450                                 reclock_child_handler,
2451                                 (void *)state);
2452
2453         if (state->fde == NULL) {
2454                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2455                 talloc_free(state);
2456                 return -1;
2457         }
2458
2459         while (state->status == RECLOCK_CHECKING) {
2460                 event_loop_once(ctdb->ev);
2461         }
2462
2463         if (state->status == RECLOCK_FAILED) {
2464                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2465                 close(ctdb->recovery_lock_fd);
2466                 ctdb->recovery_lock_fd = -1;
2467                 talloc_free(state);
2468                 return -1;
2469         }
2470
2471         talloc_free(state);
2472         return 0;
2473 }
2474
2475 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2476 {
2477         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2478         const char *reclockfile;
2479
2480         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2481                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2482                 talloc_free(tmp_ctx);
2483                 return -1;      
2484         }
2485
2486         if (reclockfile == NULL) {
2487                 if (ctdb->recovery_lock_file != NULL) {
2488                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2489                         talloc_free(ctdb->recovery_lock_file);
2490                         ctdb->recovery_lock_file = NULL;
2491                         if (ctdb->recovery_lock_fd != -1) {
2492                                 close(ctdb->recovery_lock_fd);
2493                                 ctdb->recovery_lock_fd = -1;
2494                         }
2495                 }
2496                 ctdb->tunable.verify_recovery_lock = 0;
2497                 talloc_free(tmp_ctx);
2498                 return 0;
2499         }
2500
2501         if (ctdb->recovery_lock_file == NULL) {
2502                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2503                 if (ctdb->recovery_lock_fd != -1) {
2504                         close(ctdb->recovery_lock_fd);
2505                         ctdb->recovery_lock_fd = -1;
2506                 }
2507                 talloc_free(tmp_ctx);
2508                 return 0;
2509         }
2510
2511
2512         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2513                 talloc_free(tmp_ctx);
2514                 return 0;
2515         }
2516
2517         talloc_free(ctdb->recovery_lock_file);
2518         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2519         ctdb->tunable.verify_recovery_lock = 0;
2520         if (ctdb->recovery_lock_fd != -1) {
2521                 close(ctdb->recovery_lock_fd);
2522                 ctdb->recovery_lock_fd = -1;
2523         }
2524
2525         talloc_free(tmp_ctx);
2526         return 0;
2527 }
2528                 
2529 /*
2530   the main monitoring loop
2531  */
2532 static void monitor_cluster(struct ctdb_context *ctdb)
2533 {
2534         uint32_t pnn;
2535         TALLOC_CTX *mem_ctx=NULL;
2536         struct ctdb_node_map *nodemap=NULL;
2537         struct ctdb_node_map *recmaster_nodemap=NULL;
2538         struct ctdb_node_map **remote_nodemaps=NULL;
2539         struct ctdb_vnn_map *vnnmap=NULL;
2540         struct ctdb_vnn_map *remote_vnnmap=NULL;
2541         int32_t debug_level;
2542         int i, j, ret;
2543         struct ctdb_recoverd *rec;
2544
2545         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2546
2547         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2548         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2549
2550         rec->ctdb = ctdb;
2551
2552         rec->priority_time = timeval_current();
2553
2554         /* register a message port for sending memory dumps */
2555         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2556
2557         /* register a message port for recovery elections */
2558         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2559
2560         /* when nodes are disabled/enabled */
2561         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2562
2563         /* when we are asked to puch out a flag change */
2564         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2565
2566         /* register a message port for vacuum fetch */
2567         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2568
2569         /* register a message port for reloadnodes  */
2570         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2571
2572         /* register a message port for performing a takeover run */
2573         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2574
2575         /* register a message port for disabling the ip check for a short while */
2576         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2577
2578 again:
2579         if (mem_ctx) {
2580                 talloc_free(mem_ctx);
2581                 mem_ctx = NULL;
2582         }
2583         mem_ctx = talloc_new(ctdb);
2584         if (!mem_ctx) {
2585                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2586                 exit(-1);
2587         }
2588
2589         /* we only check for recovery once every second */
2590         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2591
2592         /* verify that the main daemon is still running */
2593         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2594                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2595                 exit(-1);
2596         }
2597
2598         /* ping the local daemon to tell it we are alive */
2599         ctdb_ctrl_recd_ping(ctdb);
2600
2601         if (rec->election_timeout) {
2602                 /* an election is in progress */
2603                 goto again;
2604         }
2605
2606         /* read the debug level from the parent and update locally */
2607         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2608         if (ret !=0) {
2609                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2610                 goto again;
2611         }
2612         LogLevel = debug_level;
2613
2614
2615         /* We must check if we need to ban a node here but we want to do this
2616            as early as possible so we dont wait until we have pulled the node
2617            map from the local node. thats why we have the hardcoded value 20
2618         */
2619         for (i=0; i<ctdb->num_nodes; i++) {
2620                 struct ctdb_banning_state *ban_state;
2621
2622                 if (ctdb->nodes[i]->ban_state == NULL) {
2623                         continue;
2624                 }
2625                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2626                 if (ban_state->count < 20) {
2627                         continue;
2628                 }
2629                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2630                         ctdb->nodes[i]->pnn, ban_state->count,
2631                         ctdb->tunable.recovery_ban_period));
2632                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2633                 ban_state->count = 0;
2634         }
2635
2636         /* get relevant tunables */
2637         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2638         if (ret != 0) {
2639                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2640                 goto again;
2641         }
2642
2643         /* get the current recovery lock file from the server */
2644         if (update_recovery_lock_file(ctdb) != 0) {
2645                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2646                 goto again;
2647         }
2648
2649         /* Make sure that if recovery lock verification becomes disabled when
2650            we close the file
2651         */
2652         if (ctdb->tunable.verify_recovery_lock == 0) {
2653                 if (ctdb->recovery_lock_fd != -1) {
2654                         close(ctdb->recovery_lock_fd);
2655                         ctdb->recovery_lock_fd = -1;
2656                 }
2657         }
2658
2659         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2660         if (pnn == (uint32_t)-1) {
2661                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2662                 goto again;
2663         }
2664
2665         /* get the vnnmap */
2666         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2667         if (ret != 0) {
2668                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2669                 goto again;
2670         }
2671
2672
2673         /* get number of nodes */
2674         if (rec->nodemap) {
2675                 talloc_free(rec->nodemap);
2676                 rec->nodemap = NULL;
2677                 nodemap=NULL;
2678         }
2679         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2680         if (ret != 0) {
2681                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2682                 goto again;
2683         }
2684         nodemap = rec->nodemap;
2685
2686         /* check which node is the recovery master */
2687         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2688         if (ret != 0) {
2689                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2690                 goto again;
2691         }
2692
2693         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2694         if (rec->recmaster != pnn) {
2695                 if (rec->ip_reallocate_ctx != NULL) {
2696                         talloc_free(rec->ip_reallocate_ctx);
2697                         rec->ip_reallocate_ctx = NULL;
2698                         rec->reallocate_callers = NULL;
2699                 }
2700         }
2701         /* if there are takeovers requested, perform it and notify the waiters */
2702         if (rec->reallocate_callers) {
2703                 process_ipreallocate_requests(ctdb, rec);
2704         }
2705
2706         if (rec->recmaster == (uint32_t)-1) {
2707                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2708                 force_election(rec, pnn, nodemap);
2709                 goto again;
2710         }
2711
2712
2713         /* if the local daemon is STOPPED, we verify that the databases are
2714            also frozen and thet the recmode is set to active 
2715         */
2716         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2717                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2718                 if (ret != 0) {
2719                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2720                 }
2721                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2722                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2723
2724                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2725                         if (ret != 0) {
2726                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2727                                 goto again;
2728                         }
2729                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2730                         if (ret != 0) {
2731                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2732
2733                                 goto again;
2734                         }
2735                         goto again;
2736                 }
2737         }
2738         /* If the local node is stopped, verify we are not the recmaster 
2739            and yield this role if so
2740         */
2741         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2742                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2743                 force_election(rec, pnn, nodemap);
2744                 goto again;
2745         }
2746         
2747         /* check that we (recovery daemon) and the local ctdb daemon
2748            agrees on whether we are banned or not
2749         */
2750 //qqq
2751
2752         /* remember our own node flags */
2753         rec->node_flags = nodemap->nodes[pnn].flags;
2754
2755         /* count how many active nodes there are */
2756         rec->num_active    = 0;
2757         rec->num_connected = 0;
2758         for (i=0; i<nodemap->num; i++) {
2759                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2760                         rec->num_active++;
2761                 }
2762                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2763                         rec->num_connected++;
2764                 }
2765         }
2766
2767
2768         /* verify that the recmaster node is still active */
2769         for (j=0; j<nodemap->num; j++) {
2770                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2771                         break;
2772                 }
2773         }
2774
2775         if (j == nodemap->num) {
2776                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2777                 force_election(rec, pnn, nodemap);
2778                 goto again;
2779         }
2780
2781         /* if recovery master is disconnected we must elect a new recmaster */
2782         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2783                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2784                 force_election(rec, pnn, nodemap);
2785                 goto again;
2786         }
2787
2788         /* grap the nodemap from the recovery master to check if it is banned */
2789         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2790                                    mem_ctx, &recmaster_nodemap);
2791         if (ret != 0) {
2792                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2793                           nodemap->nodes[j].pnn));
2794                 goto again;
2795         }
2796
2797
2798         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2799                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2800                 force_election(rec, pnn, nodemap);
2801                 goto again;
2802         }
2803
2804
2805         /* verify that we have all ip addresses we should have and we dont
2806          * have addresses we shouldnt have.
2807          */ 
2808         if (ctdb->do_checkpublicip) {
2809                 if (rec->ip_check_disable_ctx == NULL) {
2810                         if (verify_ip_allocation(ctdb, pnn) != 0) {
2811                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2812                                 goto again;
2813                         }
2814                 }
2815         }
2816
2817
2818         /* if we are not the recmaster then we do not need to check
2819            if recovery is needed
2820          */
2821         if (pnn != rec->recmaster) {
2822                 goto again;
2823         }
2824
2825
2826         /* ensure our local copies of flags are right */
2827         ret = update_local_flags(rec, nodemap);
2828         if (ret == MONITOR_ELECTION_NEEDED) {
2829                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2830                 force_election(rec, pnn, nodemap);
2831                 goto again;
2832         }
2833         if (ret != MONITOR_OK) {
2834                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2835                 goto again;
2836         }
2837
2838         /* update the list of public ips that a node can handle for
2839            all connected nodes
2840         */
2841         if (ctdb->num_nodes != nodemap->num) {
2842                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2843                 reload_nodes_file(ctdb);
2844                 goto again;
2845         }
2846         for (j=0; j<nodemap->num; j++) {
2847                 /* release any existing data */
2848                 if (ctdb->nodes[j]->public_ips) {
2849                         talloc_free(ctdb->nodes[j]->public_ips);
2850                         ctdb->nodes[j]->public_ips = NULL;
2851                 }
2852
2853                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2854                         continue;
2855                 }
2856
2857                 /* grab a new shiny list of public ips from the node */
2858                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2859                         ctdb->nodes[j]->pnn, 
2860                         ctdb->nodes,
2861                         &ctdb->nodes[j]->public_ips)) {
2862                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2863                                 ctdb->nodes[j]->pnn));
2864                         goto again;
2865                 }
2866         }
2867
2868
2869         /* verify that all active nodes agree that we are the recmaster */
2870         switch (verify_recmaster(rec, nodemap, pnn)) {
2871         case MONITOR_RECOVERY_NEEDED:
2872                 /* can not happen */
2873                 goto again;
2874         case MONITOR_ELECTION_NEEDED:
2875                 force_election(rec, pnn, nodemap);
2876                 goto again;
2877         case MONITOR_OK:
2878                 break;
2879         case MONITOR_FAILED:
2880                 goto again;
2881         }
2882
2883
2884         if (rec->need_recovery) {
2885                 /* a previous recovery didn't finish */
2886                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2887                 goto again;             
2888         }
2889
2890         /* verify that all active nodes are in normal mode 
2891            and not in recovery mode 
2892         */
2893         switch (verify_recmode(ctdb, nodemap)) {
2894         case MONITOR_RECOVERY_NEEDED:
2895                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2896                 goto again;
2897         case MONITOR_FAILED:
2898                 goto again;
2899         case MONITOR_ELECTION_NEEDED:
2900                 /* can not happen */
2901         case MONITOR_OK:
2902                 break;
2903         }
2904
2905
2906         if (ctdb->tunable.verify_recovery_lock != 0) {
2907                 /* we should have the reclock - check its not stale */
2908                 ret = check_recovery_lock(ctdb);
2909                 if (ret != 0) {
2910                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
2911                         ctdb_set_culprit(rec, ctdb->pnn);
2912                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2913                         goto again;
2914                 }
2915         }
2916
2917         /* get the nodemap for all active remote nodes
2918          */
2919         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2920         if (remote_nodemaps == NULL) {
2921                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2922                 goto again;
2923         }
2924         for(i=0; i<nodemap->num; i++) {
2925                 remote_nodemaps[i] = NULL;
2926         }
2927         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2928                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2929                 goto again;
2930         } 
2931
2932         /* verify that all other nodes have the same nodemap as we have
2933         */
2934         for (j=0; j<nodemap->num; j++) {
2935                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2936                         continue;
2937                 }
2938
2939                 if (remote_nodemaps[j] == NULL) {
2940                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2941                         ctdb_set_culprit(rec, j);
2942
2943                         goto again;
2944                 }
2945
2946                 /* if the nodes disagree on how many nodes there are
2947                    then this is a good reason to try recovery
2948                  */
2949                 if (remote_nodemaps[j]->num != nodemap->num) {
2950                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2951                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2952                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2953                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2954                         goto again;
2955                 }
2956
2957                 /* if the nodes disagree on which nodes exist and are
2958                    active, then that is also a good reason to do recovery
2959                  */
2960                 for (i=0;i<nodemap->num;i++) {
2961                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2962                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2963                                           nodemap->nodes[j].pnn, i, 
2964                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2965                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2966                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2967                                             vnnmap);
2968                                 goto again;
2969                         }
2970                 }
2971
2972                 /* verify the flags are consistent
2973                 */
2974                 for (i=0; i<nodemap->num; i++) {
2975                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2976                                 continue;
2977                         }
2978                         
2979                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2980                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2981                                   nodemap->nodes[j].pnn, 
2982                                   nodemap->nodes[i].pnn, 
2983                                   remote_nodemaps[j]->nodes[i].flags,
2984                                   nodemap->nodes[j].flags));
2985                                 if (i == j) {
2986                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2987                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2988                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2989                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2990                                                     vnnmap);
2991                                         goto again;
2992                                 } else {
2993                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2994                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2995                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2996                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2997                                                     vnnmap);
2998                                         goto again;
2999                                 }
3000                         }
3001                 }
3002         }
3003
3004
3005         /* there better be the same number of lmasters in the vnn map
3006            as there are active nodes or we will have to do a recovery
3007          */
3008         if (vnnmap->size != rec->num_active) {
3009                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3010                           vnnmap->size, rec->num_active));
3011                 ctdb_set_culprit(rec, ctdb->pnn);
3012                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3013                 goto again;
3014         }
3015
3016         /* verify that all active nodes in the nodemap also exist in 
3017            the vnnmap.
3018          */
3019         for (j=0; j<nodemap->num; j++) {
3020                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3021                         continue;
3022                 }
3023                 if (nodemap->nodes[j].pnn == pnn) {
3024                         continue;
3025                 }
3026
3027                 for (i=0; i<vnnmap->size; i++) {
3028                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3029                                 break;
3030                         }
3031                 }
3032                 if (i == vnnmap->size) {
3033                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3034                                   nodemap->nodes[j].pnn));
3035                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3036                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3037                         goto again;
3038                 }
3039         }
3040
3041         
3042         /* verify that all other nodes have the same vnnmap
3043            and are from the same generation
3044          */
3045         for (j=0; j<nodemap->num; j++) {
3046                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3047                         continue;
3048                 }
3049                 if (nodemap->nodes[j].pnn == pnn) {
3050                         continue;
3051                 }
3052
3053                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3054                                           mem_ctx, &remote_vnnmap);
3055                 if (ret != 0) {
3056                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3057                                   nodemap->nodes[j].pnn));
3058                         goto again;
3059                 }
3060
3061                 /* verify the vnnmap generation is the same */
3062                 if (vnnmap->generation != remote_vnnmap->generation) {
3063                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3064                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3065                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3066                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3067                         goto again;
3068                 }
3069
3070                 /* verify the vnnmap size is the same */
3071                 if (vnnmap->size != remote_vnnmap->size) {
3072                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3073                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3074                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3075                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3076                         goto again;
3077                 }
3078
3079                 /* verify the vnnmap is the same */
3080                 for (i=0;i<vnnmap->size;i++) {
3081                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3082                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3083                                           nodemap->nodes[j].pnn));
3084                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3085                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3086                                             vnnmap);
3087                                 goto again;
3088                         }
3089                 }
3090         }
3091
3092         /* we might need to change who has what IP assigned */
3093         if (rec->need_takeover_run) {
3094                 rec->need_takeover_run = false;
3095
3096                 /* execute the "startrecovery" event script on all nodes */
3097                 ret = run_startrecovery_eventscript(rec, nodemap);
3098                 if (ret!=0) {
3099                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3100                         ctdb_set_culprit(rec, ctdb->pnn);
3101                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3102                 }
3103
3104                 ret = ctdb_takeover_run(ctdb, nodemap);
3105                 if (ret != 0) {
3106                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3107                         ctdb_set_culprit(rec, ctdb->pnn);
3108                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3109                 }
3110
3111                 /* execute the "recovered" event script on all nodes */
3112                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3113 #if 0
3114 // we cant check whether the event completed successfully
3115 // since this script WILL fail if the node is in recovery mode
3116 // and if that race happens, the code here would just cause a second
3117 // cascading recovery.
3118                 if (ret!=0) {
3119                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3120                         ctdb_set_culprit(rec, ctdb->pnn);
3121                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3122                 }
3123 #endif
3124         }
3125
3126
3127         goto again;
3128
3129 }
3130
3131 /*
3132   event handler for when the main ctdbd dies
3133  */
3134 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3135                                  uint16_t flags, void *private_data)
3136 {
3137         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3138         _exit(1);
3139 }
3140
3141 /*
3142   called regularly to verify that the recovery daemon is still running
3143  */
3144 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3145                               struct timeval yt, void *p)
3146 {
3147         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3148
3149         if (kill(ctdb->recoverd_pid, 0) != 0) {
3150                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3151
3152                 ctdb_stop_recoverd(ctdb);
3153                 ctdb_stop_keepalive(ctdb);
3154                 ctdb_stop_monitoring(ctdb);
3155                 ctdb_release_all_ips(ctdb);
3156                 if (ctdb->methods != NULL) {
3157                         ctdb->methods->shutdown(ctdb);
3158                 }
3159                 ctdb_event_script(ctdb, "shutdown");
3160
3161                 exit(10);       
3162         }
3163
3164         event_add_timed(ctdb->ev, ctdb, 
3165                         timeval_current_ofs(30, 0),
3166                         ctdb_check_recd, ctdb);
3167 }
3168
3169 static void recd_sig_child_handler(struct event_context *ev,
3170         struct signal_event *se, int signum, int count,
3171         void *dont_care, 
3172         void *private_data)
3173 {
3174 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3175         int status;
3176         pid_t pid = -1;
3177
3178         while (pid != 0) {
3179                 pid = waitpid(-1, &status, WNOHANG);
3180                 if (pid == -1) {
3181                         if (errno != ECHILD) {
3182                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3183                         }
3184                         return;
3185                 }
3186                 if (pid > 0) {
3187                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3188                 }
3189         }
3190 }
3191
3192 /*
3193   startup the recovery daemon as a child of the main ctdb daemon
3194  */
3195 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3196 {
3197         int fd[2];
3198         struct signal_event *se;
3199
3200         if (pipe(fd) != 0) {
3201                 return -1;
3202         }
3203
3204         ctdb->ctdbd_pid = getpid();
3205
3206         ctdb->recoverd_pid = fork();
3207         if (ctdb->recoverd_pid == -1) {
3208                 return -1;
3209         }
3210         
3211         if (ctdb->recoverd_pid != 0) {
3212                 close(fd[0]);
3213                 event_add_timed(ctdb->ev, ctdb, 
3214                                 timeval_current_ofs(30, 0),
3215                                 ctdb_check_recd, ctdb);
3216                 return 0;
3217         }
3218
3219         close(fd[1]);
3220
3221         srandom(getpid() ^ time(NULL));
3222
3223         if (switch_from_server_to_client(ctdb) != 0) {
3224                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3225                 exit(1);
3226         }
3227
3228         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3229                      ctdb_recoverd_parent, &fd[0]);     
3230
3231         /* set up a handler to pick up sigchld */
3232         se = event_add_signal(ctdb->ev, ctdb,
3233                                      SIGCHLD, 0,
3234                                      recd_sig_child_handler,
3235                                      ctdb);
3236         if (se == NULL) {
3237                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3238                 exit(1);
3239         }
3240
3241         monitor_cluster(ctdb);
3242
3243         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3244         return -1;
3245 }
3246
3247 /*
3248   shutdown the recovery daemon
3249  */
3250 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3251 {
3252         if (ctdb->recoverd_pid == 0) {
3253                 return;
3254         }
3255
3256         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3257         kill(ctdb->recoverd_pid, SIGTERM);
3258 }