during recovery, update all remote nodes so they use the same priorities
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, CONTROL_TIMEOUT(),
236                                         false, tdb_null,
237                                         async_getcap_callback, NULL,
238                                         NULL) != 0) {
239                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
240                 talloc_free(tmp_ctx);
241                 return -1;
242         }
243
244         talloc_free(tmp_ctx);
245         return 0;
246 }
247
248 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
249 {
250         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
251
252         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
253         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
254 }
255
256 /*
257   change recovery mode on all nodes
258  */
259 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
260 {
261         TDB_DATA data;
262         uint32_t *nodes;
263         TALLOC_CTX *tmp_ctx;
264
265         tmp_ctx = talloc_new(ctdb);
266         CTDB_NO_MEMORY(ctdb, tmp_ctx);
267
268         /* freeze all nodes */
269         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
270         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
271                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
272                                                 nodes, CONTROL_TIMEOUT(),
273                                                 false, tdb_null,
274                                                 NULL,
275                                                 set_recmode_fail_callback,
276                                                 rec) != 0) {
277                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
278                         talloc_free(tmp_ctx);
279                         return -1;
280                 }
281         }
282
283
284         data.dsize = sizeof(uint32_t);
285         data.dptr = (unsigned char *)&rec_mode;
286
287         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
288                                         nodes, CONTROL_TIMEOUT(),
289                                         false, data,
290                                         NULL, NULL,
291                                         NULL) != 0) {
292                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
293                 talloc_free(tmp_ctx);
294                 return -1;
295         }
296
297         talloc_free(tmp_ctx);
298         return 0;
299 }
300
301 /*
302   change recovery master on all node
303  */
304 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
305 {
306         TDB_DATA data;
307         TALLOC_CTX *tmp_ctx;
308         uint32_t *nodes;
309
310         tmp_ctx = talloc_new(ctdb);
311         CTDB_NO_MEMORY(ctdb, tmp_ctx);
312
313         data.dsize = sizeof(uint32_t);
314         data.dptr = (unsigned char *)&pnn;
315
316         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
317         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
318                                         nodes,
319                                         CONTROL_TIMEOUT(), false, data,
320                                         NULL, NULL,
321                                         NULL) != 0) {
322                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
323                 talloc_free(tmp_ctx);
324                 return -1;
325         }
326
327         talloc_free(tmp_ctx);
328         return 0;
329 }
330
331 /* update all remote nodes to use the same db priority that we have
332    this can fail if the remove node has not yet been upgraded to 
333    support this function, so we always return success and never fail
334    a recovery if this call fails.
335 */
336 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
337         struct ctdb_node_map *nodemap, 
338         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
339 {
340         int db;
341         uint32_t *nodes;
342
343         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
344
345         /* step through all local databases */
346         for (db=0; db<dbmap->num;db++) {
347                 TDB_DATA data;
348                 struct ctdb_db_priority db_prio;
349                 int ret;
350
351                 db_prio.db_id     = dbmap->dbs[db].dbid;
352                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
353                 if (ret != 0) {
354                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
355                         continue;
356                 }
357
358                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
359
360                 data.dptr  = (uint8_t *)&db_prio;
361                 data.dsize = sizeof(db_prio);
362
363                 if (ctdb_client_async_control(ctdb,
364                                         CTDB_CONTROL_SET_DB_PRIORITY,
365                                         nodes,
366                                         CONTROL_TIMEOUT(), false, data,
367                                         NULL, NULL,
368                                         NULL) != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
370                 }
371         }
372
373         return 0;
374 }                       
375
376 /*
377   ensure all other nodes have attached to any databases that we have
378  */
379 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
380                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
381 {
382         int i, j, db, ret;
383         struct ctdb_dbid_map *remote_dbmap;
384
385         /* verify that all other nodes have all our databases */
386         for (j=0; j<nodemap->num; j++) {
387                 /* we dont need to ourself ourselves */
388                 if (nodemap->nodes[j].pnn == pnn) {
389                         continue;
390                 }
391                 /* dont check nodes that are unavailable */
392                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
393                         continue;
394                 }
395
396                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
397                                          mem_ctx, &remote_dbmap);
398                 if (ret != 0) {
399                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
400                         return -1;
401                 }
402
403                 /* step through all local databases */
404                 for (db=0; db<dbmap->num;db++) {
405                         const char *name;
406
407
408                         for (i=0;i<remote_dbmap->num;i++) {
409                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
410                                         break;
411                                 }
412                         }
413                         /* the remote node already have this database */
414                         if (i!=remote_dbmap->num) {
415                                 continue;
416                         }
417                         /* ok so we need to create this database */
418                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
419                                             mem_ctx, &name);
420                         if (ret != 0) {
421                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
422                                 return -1;
423                         }
424                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
425                                            mem_ctx, name, dbmap->dbs[db].persistent);
426                         if (ret != 0) {
427                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
428                                 return -1;
429                         }
430                 }
431         }
432
433         return 0;
434 }
435
436
437 /*
438   ensure we are attached to any databases that anyone else is attached to
439  */
440 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
441                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
442 {
443         int i, j, db, ret;
444         struct ctdb_dbid_map *remote_dbmap;
445
446         /* verify that we have all database any other node has */
447         for (j=0; j<nodemap->num; j++) {
448                 /* we dont need to ourself ourselves */
449                 if (nodemap->nodes[j].pnn == pnn) {
450                         continue;
451                 }
452                 /* dont check nodes that are unavailable */
453                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
454                         continue;
455                 }
456
457                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
458                                          mem_ctx, &remote_dbmap);
459                 if (ret != 0) {
460                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
461                         return -1;
462                 }
463
464                 /* step through all databases on the remote node */
465                 for (db=0; db<remote_dbmap->num;db++) {
466                         const char *name;
467
468                         for (i=0;i<(*dbmap)->num;i++) {
469                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
470                                         break;
471                                 }
472                         }
473                         /* we already have this db locally */
474                         if (i!=(*dbmap)->num) {
475                                 continue;
476                         }
477                         /* ok so we need to create this database and
478                            rebuild dbmap
479                          */
480                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
481                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
482                         if (ret != 0) {
483                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
484                                           nodemap->nodes[j].pnn));
485                                 return -1;
486                         }
487                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
488                                            remote_dbmap->dbs[db].persistent);
489                         if (ret != 0) {
490                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
491                                 return -1;
492                         }
493                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
494                         if (ret != 0) {
495                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
496                                 return -1;
497                         }
498                 }
499         }
500
501         return 0;
502 }
503
504
505 /*
506   pull the remote database contents from one node into the recdb
507  */
508 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
509                                     struct tdb_wrap *recdb, uint32_t dbid)
510 {
511         int ret;
512         TDB_DATA outdata;
513         struct ctdb_marshall_buffer *reply;
514         struct ctdb_rec_data *rec;
515         int i;
516         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
517
518         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
519                                CONTROL_TIMEOUT(), &outdata);
520         if (ret != 0) {
521                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
522                 talloc_free(tmp_ctx);
523                 return -1;
524         }
525
526         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
527
528         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
529                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
530                 talloc_free(tmp_ctx);
531                 return -1;
532         }
533         
534         rec = (struct ctdb_rec_data *)&reply->data[0];
535         
536         for (i=0;
537              i<reply->count;
538              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
539                 TDB_DATA key, data;
540                 struct ctdb_ltdb_header *hdr;
541                 TDB_DATA existing;
542                 
543                 key.dptr = &rec->data[0];
544                 key.dsize = rec->keylen;
545                 data.dptr = &rec->data[key.dsize];
546                 data.dsize = rec->datalen;
547                 
548                 hdr = (struct ctdb_ltdb_header *)data.dptr;
549
550                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
551                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
552                         talloc_free(tmp_ctx);
553                         return -1;
554                 }
555
556                 /* fetch the existing record, if any */
557                 existing = tdb_fetch(recdb->tdb, key);
558                 
559                 if (existing.dptr != NULL) {
560                         struct ctdb_ltdb_header header;
561                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
562                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
563                                          (unsigned)existing.dsize, srcnode));
564                                 free(existing.dptr);
565                                 talloc_free(tmp_ctx);
566                                 return -1;
567                         }
568                         header = *(struct ctdb_ltdb_header *)existing.dptr;
569                         free(existing.dptr);
570                         if (!(header.rsn < hdr->rsn ||
571                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
572                                 continue;
573                         }
574                 }
575                 
576                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
577                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
578                         talloc_free(tmp_ctx);
579                         return -1;                              
580                 }
581         }
582
583         talloc_free(tmp_ctx);
584
585         return 0;
586 }
587
588 /*
589   pull all the remote database contents into the recdb
590  */
591 static int pull_remote_database(struct ctdb_context *ctdb,
592                                 struct ctdb_recoverd *rec, 
593                                 struct ctdb_node_map *nodemap, 
594                                 struct tdb_wrap *recdb, uint32_t dbid)
595 {
596         int j;
597
598         /* pull all records from all other nodes across onto this node
599            (this merges based on rsn)
600         */
601         for (j=0; j<nodemap->num; j++) {
602                 /* dont merge from nodes that are unavailable */
603                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
604                         continue;
605                 }
606                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
607                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
608                                  nodemap->nodes[j].pnn));
609                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
610                         return -1;
611                 }
612         }
613         
614         return 0;
615 }
616
617
618 /*
619   update flags on all active nodes
620  */
621 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
622 {
623         int ret;
624
625         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
626                 if (ret != 0) {
627                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
628                 return -1;
629         }
630
631         return 0;
632 }
633
634 /*
635   ensure all nodes have the same vnnmap we do
636  */
637 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
638                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
639 {
640         int j, ret;
641
642         /* push the new vnn map out to all the nodes */
643         for (j=0; j<nodemap->num; j++) {
644                 /* dont push to nodes that are unavailable */
645                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
646                         continue;
647                 }
648
649                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
650                 if (ret != 0) {
651                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
652                         return -1;
653                 }
654         }
655
656         return 0;
657 }
658
659
660 struct vacuum_info {
661         struct vacuum_info *next, *prev;
662         struct ctdb_recoverd *rec;
663         uint32_t srcnode;
664         struct ctdb_db_context *ctdb_db;
665         struct ctdb_marshall_buffer *recs;
666         struct ctdb_rec_data *r;
667 };
668
669 static void vacuum_fetch_next(struct vacuum_info *v);
670
671 /*
672   called when a vacuum fetch has completed - just free it and do the next one
673  */
674 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
675 {
676         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
677         talloc_free(state);
678         vacuum_fetch_next(v);
679 }
680
681
682 /*
683   process the next element from the vacuum list
684 */
685 static void vacuum_fetch_next(struct vacuum_info *v)
686 {
687         struct ctdb_call call;
688         struct ctdb_rec_data *r;
689
690         while (v->recs->count) {
691                 struct ctdb_client_call_state *state;
692                 TDB_DATA data;
693                 struct ctdb_ltdb_header *hdr;
694
695                 ZERO_STRUCT(call);
696                 call.call_id = CTDB_NULL_FUNC;
697                 call.flags = CTDB_IMMEDIATE_MIGRATION;
698
699                 r = v->r;
700                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
701                 v->recs->count--;
702
703                 call.key.dptr = &r->data[0];
704                 call.key.dsize = r->keylen;
705
706                 /* ensure we don't block this daemon - just skip a record if we can't get
707                    the chainlock */
708                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
709                         continue;
710                 }
711
712                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
713                 if (data.dptr == NULL) {
714                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
715                         continue;
716                 }
717
718                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
719                         free(data.dptr);
720                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
721                         continue;
722                 }
723                 
724                 hdr = (struct ctdb_ltdb_header *)data.dptr;
725                 if (hdr->dmaster == v->rec->ctdb->pnn) {
726                         /* its already local */
727                         free(data.dptr);
728                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
729                         continue;
730                 }
731
732                 free(data.dptr);
733
734                 state = ctdb_call_send(v->ctdb_db, &call);
735                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
736                 if (state == NULL) {
737                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
738                         talloc_free(v);
739                         return;
740                 }
741                 state->async.fn = vacuum_fetch_callback;
742                 state->async.private_data = v;
743                 return;
744         }
745
746         talloc_free(v);
747 }
748
749
750 /*
751   destroy a vacuum info structure
752  */
753 static int vacuum_info_destructor(struct vacuum_info *v)
754 {
755         DLIST_REMOVE(v->rec->vacuum_info, v);
756         return 0;
757 }
758
759
760 /*
761   handler for vacuum fetch
762 */
763 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
764                                  TDB_DATA data, void *private_data)
765 {
766         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
767         struct ctdb_marshall_buffer *recs;
768         int ret, i;
769         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
770         const char *name;
771         struct ctdb_dbid_map *dbmap=NULL;
772         bool persistent = false;
773         struct ctdb_db_context *ctdb_db;
774         struct ctdb_rec_data *r;
775         uint32_t srcnode;
776         struct vacuum_info *v;
777
778         recs = (struct ctdb_marshall_buffer *)data.dptr;
779         r = (struct ctdb_rec_data *)&recs->data[0];
780
781         if (recs->count == 0) {
782                 talloc_free(tmp_ctx);
783                 return;
784         }
785
786         srcnode = r->reqid;
787
788         for (v=rec->vacuum_info;v;v=v->next) {
789                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
790                         /* we're already working on records from this node */
791                         talloc_free(tmp_ctx);
792                         return;
793                 }
794         }
795
796         /* work out if the database is persistent */
797         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
798         if (ret != 0) {
799                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         for (i=0;i<dbmap->num;i++) {
805                 if (dbmap->dbs[i].dbid == recs->db_id) {
806                         persistent = dbmap->dbs[i].persistent;
807                         break;
808                 }
809         }
810         if (i == dbmap->num) {
811                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
812                 talloc_free(tmp_ctx);
813                 return;         
814         }
815
816         /* find the name of this database */
817         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
818                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
819                 talloc_free(tmp_ctx);
820                 return;
821         }
822
823         /* attach to it */
824         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
825         if (ctdb_db == NULL) {
826                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
827                 talloc_free(tmp_ctx);
828                 return;
829         }
830
831         v = talloc_zero(rec, struct vacuum_info);
832         if (v == NULL) {
833                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
834                 talloc_free(tmp_ctx);
835                 return;
836         }
837
838         v->rec = rec;
839         v->srcnode = srcnode;
840         v->ctdb_db = ctdb_db;
841         v->recs = talloc_memdup(v, recs, data.dsize);
842         if (v->recs == NULL) {
843                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
844                 talloc_free(v);
845                 talloc_free(tmp_ctx);
846                 return;         
847         }
848         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
849
850         DLIST_ADD(rec->vacuum_info, v);
851
852         talloc_set_destructor(v, vacuum_info_destructor);
853
854         vacuum_fetch_next(v);
855         talloc_free(tmp_ctx);
856 }
857
858
859 /*
860   called when ctdb_wait_timeout should finish
861  */
862 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
863                               struct timeval yt, void *p)
864 {
865         uint32_t *timed_out = (uint32_t *)p;
866         (*timed_out) = 1;
867 }
868
869 /*
870   wait for a given number of seconds
871  */
872 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
873 {
874         uint32_t timed_out = 0;
875         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
876         while (!timed_out) {
877                 event_loop_once(ctdb->ev);
878         }
879 }
880
881 /*
882   called when an election times out (ends)
883  */
884 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
885                                   struct timeval t, void *p)
886 {
887         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
888         rec->election_timeout = NULL;
889
890         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
891 }
892
893
894 /*
895   wait for an election to finish. It finished election_timeout seconds after
896   the last election packet is received
897  */
898 static void ctdb_wait_election(struct ctdb_recoverd *rec)
899 {
900         struct ctdb_context *ctdb = rec->ctdb;
901         while (rec->election_timeout) {
902                 event_loop_once(ctdb->ev);
903         }
904 }
905
906 /*
907   Update our local flags from all remote connected nodes. 
908   This is only run when we are or we belive we are the recovery master
909  */
910 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
911 {
912         int j;
913         struct ctdb_context *ctdb = rec->ctdb;
914         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
915
916         /* get the nodemap for all active remote nodes and verify
917            they are the same as for this node
918          */
919         for (j=0; j<nodemap->num; j++) {
920                 struct ctdb_node_map *remote_nodemap=NULL;
921                 int ret;
922
923                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
924                         continue;
925                 }
926                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
927                         continue;
928                 }
929
930                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
931                                            mem_ctx, &remote_nodemap);
932                 if (ret != 0) {
933                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
934                                   nodemap->nodes[j].pnn));
935                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
936                         talloc_free(mem_ctx);
937                         return MONITOR_FAILED;
938                 }
939                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
940                         /* We should tell our daemon about this so it
941                            updates its flags or else we will log the same 
942                            message again in the next iteration of recovery.
943                            Since we are the recovery master we can just as
944                            well update the flags on all nodes.
945                         */
946                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
947                         if (ret != 0) {
948                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
949                                 return -1;
950                         }
951
952                         /* Update our local copy of the flags in the recovery
953                            daemon.
954                         */
955                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
956                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
957                                  nodemap->nodes[j].flags));
958                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
959                 }
960                 talloc_free(remote_nodemap);
961         }
962         talloc_free(mem_ctx);
963         return MONITOR_OK;
964 }
965
966
967 /* Create a new random generation ip. 
968    The generation id can not be the INVALID_GENERATION id
969 */
970 static uint32_t new_generation(void)
971 {
972         uint32_t generation;
973
974         while (1) {
975                 generation = random();
976
977                 if (generation != INVALID_GENERATION) {
978                         break;
979                 }
980         }
981
982         return generation;
983 }
984
985
986 /*
987   create a temporary working database
988  */
989 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
990 {
991         char *name;
992         struct tdb_wrap *recdb;
993         unsigned tdb_flags;
994
995         /* open up the temporary recovery database */
996         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
997         if (name == NULL) {
998                 return NULL;
999         }
1000         unlink(name);
1001
1002         tdb_flags = TDB_NOLOCK;
1003         if (!ctdb->do_setsched) {
1004                 tdb_flags |= TDB_NOMMAP;
1005         }
1006
1007         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1008                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1009         if (recdb == NULL) {
1010                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1011         }
1012
1013         talloc_free(name);
1014
1015         return recdb;
1016 }
1017
1018
1019 /* 
1020    a traverse function for pulling all relevent records from recdb
1021  */
1022 struct recdb_data {
1023         struct ctdb_context *ctdb;
1024         struct ctdb_marshall_buffer *recdata;
1025         uint32_t len;
1026         bool failed;
1027 };
1028
1029 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1030 {
1031         struct recdb_data *params = (struct recdb_data *)p;
1032         struct ctdb_rec_data *rec;
1033         struct ctdb_ltdb_header *hdr;
1034
1035         /* skip empty records */
1036         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1037                 return 0;
1038         }
1039
1040         /* update the dmaster field to point to us */
1041         hdr = (struct ctdb_ltdb_header *)data.dptr;
1042         hdr->dmaster = params->ctdb->pnn;
1043
1044         /* add the record to the blob ready to send to the nodes */
1045         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1046         if (rec == NULL) {
1047                 params->failed = true;
1048                 return -1;
1049         }
1050         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1051         if (params->recdata == NULL) {
1052                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1053                          rec->length + params->len, params->recdata->count));
1054                 params->failed = true;
1055                 return -1;
1056         }
1057         params->recdata->count++;
1058         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1059         params->len += rec->length;
1060         talloc_free(rec);
1061
1062         return 0;
1063 }
1064
1065 /*
1066   push the recdb database out to all nodes
1067  */
1068 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1069                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1070 {
1071         struct recdb_data params;
1072         struct ctdb_marshall_buffer *recdata;
1073         TDB_DATA outdata;
1074         TALLOC_CTX *tmp_ctx;
1075         uint32_t *nodes;
1076
1077         tmp_ctx = talloc_new(ctdb);
1078         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1079
1080         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1081         CTDB_NO_MEMORY(ctdb, recdata);
1082
1083         recdata->db_id = dbid;
1084
1085         params.ctdb = ctdb;
1086         params.recdata = recdata;
1087         params.len = offsetof(struct ctdb_marshall_buffer, data);
1088         params.failed = false;
1089
1090         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1091                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1092                 talloc_free(params.recdata);
1093                 talloc_free(tmp_ctx);
1094                 return -1;
1095         }
1096
1097         if (params.failed) {
1098                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1099                 talloc_free(params.recdata);
1100                 talloc_free(tmp_ctx);
1101                 return -1;              
1102         }
1103
1104         recdata = params.recdata;
1105
1106         outdata.dptr = (void *)recdata;
1107         outdata.dsize = params.len;
1108
1109         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1110         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1111                                         nodes,
1112                                         CONTROL_TIMEOUT(), false, outdata,
1113                                         NULL, NULL,
1114                                         NULL) != 0) {
1115                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1116                 talloc_free(recdata);
1117                 talloc_free(tmp_ctx);
1118                 return -1;
1119         }
1120
1121         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1122                   dbid, recdata->count));
1123
1124         talloc_free(recdata);
1125         talloc_free(tmp_ctx);
1126
1127         return 0;
1128 }
1129
1130
1131 /*
1132   go through a full recovery on one database 
1133  */
1134 static int recover_database(struct ctdb_recoverd *rec, 
1135                             TALLOC_CTX *mem_ctx,
1136                             uint32_t dbid,
1137                             uint32_t pnn, 
1138                             struct ctdb_node_map *nodemap,
1139                             uint32_t transaction_id)
1140 {
1141         struct tdb_wrap *recdb;
1142         int ret;
1143         struct ctdb_context *ctdb = rec->ctdb;
1144         TDB_DATA data;
1145         struct ctdb_control_wipe_database w;
1146         uint32_t *nodes;
1147
1148         recdb = create_recdb(ctdb, mem_ctx);
1149         if (recdb == NULL) {
1150                 return -1;
1151         }
1152
1153         /* pull all remote databases onto the recdb */
1154         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1155         if (ret != 0) {
1156                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1157                 return -1;
1158         }
1159
1160         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1161
1162         /* wipe all the remote databases. This is safe as we are in a transaction */
1163         w.db_id = dbid;
1164         w.transaction_id = transaction_id;
1165
1166         data.dptr = (void *)&w;
1167         data.dsize = sizeof(w);
1168
1169         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1170         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1171                                         nodes,
1172                                         CONTROL_TIMEOUT(), false, data,
1173                                         NULL, NULL,
1174                                         NULL) != 0) {
1175                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1176                 talloc_free(recdb);
1177                 return -1;
1178         }
1179         
1180         /* push out the correct database. This sets the dmaster and skips 
1181            the empty records */
1182         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1183         if (ret != 0) {
1184                 talloc_free(recdb);
1185                 return -1;
1186         }
1187
1188         /* all done with this database */
1189         talloc_free(recdb);
1190
1191         return 0;
1192 }
1193
1194 /*
1195   reload the nodes file 
1196 */
1197 static void reload_nodes_file(struct ctdb_context *ctdb)
1198 {
1199         ctdb->nodes = NULL;
1200         ctdb_load_nodes_file(ctdb);
1201 }
1202
1203         
1204 /*
1205   we are the recmaster, and recovery is needed - start a recovery run
1206  */
1207 static int do_recovery(struct ctdb_recoverd *rec, 
1208                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1209                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1210 {
1211         struct ctdb_context *ctdb = rec->ctdb;
1212         int i, j, ret;
1213         uint32_t generation;
1214         struct ctdb_dbid_map *dbmap;
1215         TDB_DATA data;
1216         uint32_t *nodes;
1217         struct timeval start_time;
1218
1219         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1220
1221         /* if recovery fails, force it again */
1222         rec->need_recovery = true;
1223
1224         for (i=0; i<ctdb->num_nodes; i++) {
1225                 struct ctdb_banning_state *ban_state;
1226
1227                 if (ctdb->nodes[i]->ban_state == NULL) {
1228                         continue;
1229                 }
1230                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1231                 if (ban_state->count < 2*ctdb->num_nodes) {
1232                         continue;
1233                 }
1234                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1235                         ctdb->nodes[i]->pnn, ban_state->count,
1236                         ctdb->tunable.recovery_ban_period));
1237                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1238                 ban_state->count = 0;
1239         }
1240
1241
1242         if (ctdb->tunable.verify_recovery_lock != 0) {
1243                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1244                 start_time = timeval_current();
1245                 if (!ctdb_recovery_lock(ctdb, true)) {
1246                         ctdb_set_culprit(rec, pnn);
1247                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1248                         return -1;
1249                 }
1250                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1251                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1252         }
1253
1254         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1255
1256         /* get a list of all databases */
1257         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1258         if (ret != 0) {
1259                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1260                 return -1;
1261         }
1262
1263         /* we do the db creation before we set the recovery mode, so the freeze happens
1264            on all databases we will be dealing with. */
1265
1266         /* verify that we have all the databases any other node has */
1267         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1268         if (ret != 0) {
1269                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1270                 return -1;
1271         }
1272
1273         /* verify that all other nodes have all our databases */
1274         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1275         if (ret != 0) {
1276                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1277                 return -1;
1278         }
1279         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1280
1281         /* update the database priority for all remote databases */
1282         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1285         }
1286         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1287
1288
1289         /* set recovery mode to active on all nodes */
1290         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1291         if (ret != 0) {
1292                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1293                 return -1;
1294         }
1295
1296         /* execute the "startrecovery" event script on all nodes */
1297         ret = run_startrecovery_eventscript(rec, nodemap);
1298         if (ret!=0) {
1299                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1300                 return -1;
1301         }
1302
1303         /* pick a new generation number */
1304         generation = new_generation();
1305
1306         /* change the vnnmap on this node to use the new generation 
1307            number but not on any other nodes.
1308            this guarantees that if we abort the recovery prematurely
1309            for some reason (a node stops responding?)
1310            that we can just return immediately and we will reenter
1311            recovery shortly again.
1312            I.e. we deliberately leave the cluster with an inconsistent
1313            generation id to allow us to abort recovery at any stage and
1314            just restart it from scratch.
1315          */
1316         vnnmap->generation = generation;
1317         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1318         if (ret != 0) {
1319                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1320                 return -1;
1321         }
1322
1323         data.dptr = (void *)&generation;
1324         data.dsize = sizeof(uint32_t);
1325
1326         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1327         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1328                                         nodes,
1329                                         CONTROL_TIMEOUT(), false, data,
1330                                         NULL, NULL,
1331                                         NULL) != 0) {
1332                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1333                 return -1;
1334         }
1335
1336         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1337
1338         for (i=0;i<dbmap->num;i++) {
1339                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1340                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1341                         return -1;
1342                 }
1343         }
1344
1345         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1346
1347         /* commit all the changes */
1348         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1349                                         nodes,
1350                                         CONTROL_TIMEOUT(), false, data,
1351                                         NULL, NULL,
1352                                         NULL) != 0) {
1353                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1354                 return -1;
1355         }
1356
1357         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1358         
1359
1360         /* update the capabilities for all nodes */
1361         ret = update_capabilities(ctdb, nodemap);
1362         if (ret!=0) {
1363                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1364                 return -1;
1365         }
1366
1367         /* build a new vnn map with all the currently active and
1368            unbanned nodes */
1369         generation = new_generation();
1370         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1371         CTDB_NO_MEMORY(ctdb, vnnmap);
1372         vnnmap->generation = generation;
1373         vnnmap->size = 0;
1374         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1375         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1376         for (i=j=0;i<nodemap->num;i++) {
1377                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1378                         continue;
1379                 }
1380                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1381                         /* this node can not be an lmaster */
1382                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1383                         continue;
1384                 }
1385
1386                 vnnmap->size++;
1387                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1388                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1389                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1390
1391         }
1392         if (vnnmap->size == 0) {
1393                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1394                 vnnmap->size++;
1395                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1396                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1397                 vnnmap->map[0] = pnn;
1398         }       
1399
1400         /* update to the new vnnmap on all nodes */
1401         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1402         if (ret != 0) {
1403                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1404                 return -1;
1405         }
1406
1407         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1408
1409         /* update recmaster to point to us for all nodes */
1410         ret = set_recovery_master(ctdb, nodemap, pnn);
1411         if (ret!=0) {
1412                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1413                 return -1;
1414         }
1415
1416         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1417
1418         /*
1419           update all nodes to have the same flags that we have
1420          */
1421         for (i=0;i<nodemap->num;i++) {
1422                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1423                         continue;
1424                 }
1425
1426                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1427                 if (ret != 0) {
1428                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1429                         return -1;
1430                 }
1431         }
1432
1433         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1434
1435         /* disable recovery mode */
1436         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1437         if (ret != 0) {
1438                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1439                 return -1;
1440         }
1441
1442         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1443
1444         /*
1445           tell nodes to takeover their public IPs
1446          */
1447         rec->need_takeover_run = false;
1448         ret = ctdb_takeover_run(ctdb, nodemap);
1449         if (ret != 0) {
1450                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1451                 return -1;
1452         }
1453         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1454
1455         /* execute the "recovered" event script on all nodes */
1456         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1457         if (ret!=0) {
1458                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1459                 return -1;
1460         }
1461
1462         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1463
1464         /* send a message to all clients telling them that the cluster 
1465            has been reconfigured */
1466         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1467
1468         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1469
1470         rec->need_recovery = false;
1471
1472         /* we managed to complete a full recovery, make sure to forgive
1473            any past sins by the nodes that could now participate in the
1474            recovery.
1475         */
1476         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1477         for (i=0;i<nodemap->num;i++) {
1478                 struct ctdb_banning_state *ban_state;
1479
1480                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1481                         continue;
1482                 }
1483
1484                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1485                 if (ban_state == NULL) {
1486                         continue;
1487                 }
1488
1489                 ban_state->count = 0;
1490         }
1491
1492
1493         /* We just finished a recovery successfully. 
1494            We now wait for rerecovery_timeout before we allow 
1495            another recovery to take place.
1496         */
1497         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1498         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1499         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1500
1501         return 0;
1502 }
1503
1504
1505 /*
1506   elections are won by first checking the number of connected nodes, then
1507   the priority time, then the pnn
1508  */
1509 struct election_message {
1510         uint32_t num_connected;
1511         struct timeval priority_time;
1512         uint32_t pnn;
1513         uint32_t node_flags;
1514 };
1515
1516 /*
1517   form this nodes election data
1518  */
1519 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1520 {
1521         int ret, i;
1522         struct ctdb_node_map *nodemap;
1523         struct ctdb_context *ctdb = rec->ctdb;
1524
1525         ZERO_STRUCTP(em);
1526
1527         em->pnn = rec->ctdb->pnn;
1528         em->priority_time = rec->priority_time;
1529
1530         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1531         if (ret != 0) {
1532                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1533                 return;
1534         }
1535
1536         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1537         em->node_flags = rec->node_flags;
1538
1539         for (i=0;i<nodemap->num;i++) {
1540                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1541                         em->num_connected++;
1542                 }
1543         }
1544
1545         /* we shouldnt try to win this election if we cant be a recmaster */
1546         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1547                 em->num_connected = 0;
1548                 em->priority_time = timeval_current();
1549         }
1550
1551         talloc_free(nodemap);
1552 }
1553
1554 /*
1555   see if the given election data wins
1556  */
1557 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1558 {
1559         struct election_message myem;
1560         int cmp = 0;
1561
1562         ctdb_election_data(rec, &myem);
1563
1564         /* we cant win if we dont have the recmaster capability */
1565         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1566                 return false;
1567         }
1568
1569         /* we cant win if we are banned */
1570         if (rec->node_flags & NODE_FLAGS_BANNED) {
1571                 return false;
1572         }       
1573
1574         /* we cant win if we are stopped */
1575         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1576                 return false;
1577         }       
1578
1579         /* we will automatically win if the other node is banned */
1580         if (em->node_flags & NODE_FLAGS_BANNED) {
1581                 return true;
1582         }
1583
1584         /* we will automatically win if the other node is banned */
1585         if (em->node_flags & NODE_FLAGS_STOPPED) {
1586                 return true;
1587         }
1588
1589         /* try to use the most connected node */
1590         if (cmp == 0) {
1591                 cmp = (int)myem.num_connected - (int)em->num_connected;
1592         }
1593
1594         /* then the longest running node */
1595         if (cmp == 0) {
1596                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1597         }
1598
1599         if (cmp == 0) {
1600                 cmp = (int)myem.pnn - (int)em->pnn;
1601         }
1602
1603         return cmp > 0;
1604 }
1605
1606 /*
1607   send out an election request
1608  */
1609 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1610 {
1611         int ret;
1612         TDB_DATA election_data;
1613         struct election_message emsg;
1614         uint64_t srvid;
1615         struct ctdb_context *ctdb = rec->ctdb;
1616
1617         srvid = CTDB_SRVID_RECOVERY;
1618
1619         ctdb_election_data(rec, &emsg);
1620
1621         election_data.dsize = sizeof(struct election_message);
1622         election_data.dptr  = (unsigned char *)&emsg;
1623
1624
1625         /* send an election message to all active nodes */
1626         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1627         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1628
1629
1630         /* A new node that is already frozen has entered the cluster.
1631            The existing nodes are not frozen and dont need to be frozen
1632            until the election has ended and we start the actual recovery
1633         */
1634         if (update_recmaster == true) {
1635                 /* first we assume we will win the election and set 
1636                    recoverymaster to be ourself on the current node
1637                  */
1638                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1639                 if (ret != 0) {
1640                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1641                         return -1;
1642                 }
1643         }
1644
1645
1646         return 0;
1647 }
1648
1649 /*
1650   this function will unban all nodes in the cluster
1651 */
1652 static void unban_all_nodes(struct ctdb_context *ctdb)
1653 {
1654         int ret, i;
1655         struct ctdb_node_map *nodemap;
1656         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1657         
1658         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1659         if (ret != 0) {
1660                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1661                 return;
1662         }
1663
1664         for (i=0;i<nodemap->num;i++) {
1665                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1666                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1667                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1668                 }
1669         }
1670
1671         talloc_free(tmp_ctx);
1672 }
1673
1674
1675 /*
1676   we think we are winning the election - send a broadcast election request
1677  */
1678 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1679 {
1680         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1681         int ret;
1682
1683         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1684         if (ret != 0) {
1685                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1686         }
1687
1688         talloc_free(rec->send_election_te);
1689         rec->send_election_te = NULL;
1690 }
1691
1692 /*
1693   handler for memory dumps
1694 */
1695 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1696                              TDB_DATA data, void *private_data)
1697 {
1698         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1699         TDB_DATA *dump;
1700         int ret;
1701         struct rd_memdump_reply *rd;
1702
1703         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1704                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1705                 talloc_free(tmp_ctx);
1706                 return;
1707         }
1708         rd = (struct rd_memdump_reply *)data.dptr;
1709
1710         dump = talloc_zero(tmp_ctx, TDB_DATA);
1711         if (dump == NULL) {
1712                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1713                 talloc_free(tmp_ctx);
1714                 return;
1715         }
1716         ret = ctdb_dump_memory(ctdb, dump);
1717         if (ret != 0) {
1718                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1719                 talloc_free(tmp_ctx);
1720                 return;
1721         }
1722
1723 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1724
1725         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1726         if (ret != 0) {
1727                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1728                 talloc_free(tmp_ctx);
1729                 return;
1730         }
1731
1732         talloc_free(tmp_ctx);
1733 }
1734
1735 /*
1736   handler for reload_nodes
1737 */
1738 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1739                              TDB_DATA data, void *private_data)
1740 {
1741         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1742
1743         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1744
1745         reload_nodes_file(rec->ctdb);
1746 }
1747
1748
1749 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1750                               struct timeval yt, void *p)
1751 {
1752         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1753
1754         talloc_free(rec->ip_check_disable_ctx);
1755         rec->ip_check_disable_ctx = NULL;
1756 }
1757
1758 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1759                              TDB_DATA data, void *private_data)
1760 {
1761         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1762         uint32_t timeout;
1763
1764         if (rec->ip_check_disable_ctx != NULL) {
1765                 talloc_free(rec->ip_check_disable_ctx);
1766                 rec->ip_check_disable_ctx = NULL;
1767         }
1768
1769         if (data.dsize != sizeof(uint32_t)) {
1770                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu expexting %lu\n", data.dsize, sizeof(uint32_t)));
1771                 return;
1772         }
1773         if (data.dptr == NULL) {
1774                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1775                 return;
1776         }
1777
1778         timeout = *((uint32_t *)data.dptr);
1779         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1780
1781         rec->ip_check_disable_ctx = talloc_new(rec);
1782         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1783
1784         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1785 }
1786
1787
1788 /*
1789   handler for ip reallocate, just add it to the list of callers and 
1790   handle this later in the monitor_cluster loop so we do not recurse
1791   with other callers to takeover_run()
1792 */
1793 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1794                              TDB_DATA data, void *private_data)
1795 {
1796         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1797         struct ip_reallocate_list *caller;
1798
1799         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1800                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1801                 return;
1802         }
1803
1804         if (rec->ip_reallocate_ctx == NULL) {
1805                 rec->ip_reallocate_ctx = talloc_new(rec);
1806                 CTDB_NO_MEMORY_FATAL(ctdb, caller);
1807         }
1808
1809         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1810         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1811
1812         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1813         caller->next = rec->reallocate_callers;
1814         rec->reallocate_callers = caller;
1815
1816         return;
1817 }
1818
1819 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1820 {
1821         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1822         TDB_DATA result;
1823         int32_t ret;
1824         struct ip_reallocate_list *callers;
1825
1826         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1827         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1828         result.dsize = sizeof(int32_t);
1829         result.dptr  = (uint8_t *)&ret;
1830
1831         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1832                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1833                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1834                 if (ret != 0) {
1835                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1836                 }
1837         }
1838
1839         talloc_free(tmp_ctx);
1840         talloc_free(rec->ip_reallocate_ctx);
1841         rec->ip_reallocate_ctx = NULL;
1842         rec->reallocate_callers = NULL;
1843         
1844 }
1845
1846
1847 /*
1848   handler for recovery master elections
1849 */
1850 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1851                              TDB_DATA data, void *private_data)
1852 {
1853         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1854         int ret;
1855         struct election_message *em = (struct election_message *)data.dptr;
1856         TALLOC_CTX *mem_ctx;
1857
1858         /* we got an election packet - update the timeout for the election */
1859         talloc_free(rec->election_timeout);
1860         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1861                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1862                                                 ctdb_election_timeout, rec);
1863
1864         mem_ctx = talloc_new(ctdb);
1865
1866         /* someone called an election. check their election data
1867            and if we disagree and we would rather be the elected node, 
1868            send a new election message to all other nodes
1869          */
1870         if (ctdb_election_win(rec, em)) {
1871                 if (!rec->send_election_te) {
1872                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1873                                                                 timeval_current_ofs(0, 500000),
1874                                                                 election_send_request, rec);
1875                 }
1876                 talloc_free(mem_ctx);
1877                 /*unban_all_nodes(ctdb);*/
1878                 return;
1879         }
1880         
1881         /* we didn't win */
1882         talloc_free(rec->send_election_te);
1883         rec->send_election_te = NULL;
1884
1885         if (ctdb->tunable.verify_recovery_lock != 0) {
1886                 /* release the recmaster lock */
1887                 if (em->pnn != ctdb->pnn &&
1888                     ctdb->recovery_lock_fd != -1) {
1889                         close(ctdb->recovery_lock_fd);
1890                         ctdb->recovery_lock_fd = -1;
1891                         unban_all_nodes(ctdb);
1892                 }
1893         }
1894
1895         /* ok, let that guy become recmaster then */
1896         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1897         if (ret != 0) {
1898                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1899                 talloc_free(mem_ctx);
1900                 return;
1901         }
1902
1903         talloc_free(mem_ctx);
1904         return;
1905 }
1906
1907
1908 /*
1909   force the start of the election process
1910  */
1911 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1912                            struct ctdb_node_map *nodemap)
1913 {
1914         int ret;
1915         struct ctdb_context *ctdb = rec->ctdb;
1916
1917         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1918
1919         /* set all nodes to recovery mode to stop all internode traffic */
1920         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1921         if (ret != 0) {
1922                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1923                 return;
1924         }
1925
1926         talloc_free(rec->election_timeout);
1927         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1928                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1929                                                 ctdb_election_timeout, rec);
1930
1931         ret = send_election_request(rec, pnn, true);
1932         if (ret!=0) {
1933                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1934                 return;
1935         }
1936
1937         /* wait for a few seconds to collect all responses */
1938         ctdb_wait_election(rec);
1939 }
1940
1941
1942
1943 /*
1944   handler for when a node changes its flags
1945 */
1946 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1947                             TDB_DATA data, void *private_data)
1948 {
1949         int ret;
1950         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1951         struct ctdb_node_map *nodemap=NULL;
1952         TALLOC_CTX *tmp_ctx;
1953         uint32_t changed_flags;
1954         int i;
1955         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1956         int disabled_flag_changed;
1957
1958         if (data.dsize != sizeof(*c)) {
1959                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1960                 return;
1961         }
1962
1963         tmp_ctx = talloc_new(ctdb);
1964         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1965
1966         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1967         if (ret != 0) {
1968                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1969                 talloc_free(tmp_ctx);
1970                 return;         
1971         }
1972
1973
1974         for (i=0;i<nodemap->num;i++) {
1975                 if (nodemap->nodes[i].pnn == c->pnn) break;
1976         }
1977
1978         if (i == nodemap->num) {
1979                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1980                 talloc_free(tmp_ctx);
1981                 return;
1982         }
1983
1984         changed_flags = c->old_flags ^ c->new_flags;
1985
1986         if (nodemap->nodes[i].flags != c->new_flags) {
1987                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1988         }
1989
1990         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
1991
1992         nodemap->nodes[i].flags = c->new_flags;
1993
1994         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1995                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1996
1997         if (ret == 0) {
1998                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1999                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2000         }
2001         
2002         if (ret == 0 &&
2003             ctdb->recovery_master == ctdb->pnn &&
2004             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2005                 /* Only do the takeover run if the perm disabled or unhealthy
2006                    flags changed since these will cause an ip failover but not
2007                    a recovery.
2008                    If the node became disconnected or banned this will also
2009                    lead to an ip address failover but that is handled 
2010                    during recovery
2011                 */
2012                 if (disabled_flag_changed) {
2013                         rec->need_takeover_run = true;
2014                 }
2015         }
2016
2017         talloc_free(tmp_ctx);
2018 }
2019
2020 /*
2021   handler for when we need to push out flag changes ot all other nodes
2022 */
2023 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2024                             TDB_DATA data, void *private_data)
2025 {
2026         int ret;
2027         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2028
2029         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
2030         if (ret != 0) {
2031                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2032         }
2033 }
2034
2035
2036 struct verify_recmode_normal_data {
2037         uint32_t count;
2038         enum monitor_result status;
2039 };
2040
2041 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2042 {
2043         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2044
2045
2046         /* one more node has responded with recmode data*/
2047         rmdata->count--;
2048
2049         /* if we failed to get the recmode, then return an error and let
2050            the main loop try again.
2051         */
2052         if (state->state != CTDB_CONTROL_DONE) {
2053                 if (rmdata->status == MONITOR_OK) {
2054                         rmdata->status = MONITOR_FAILED;
2055                 }
2056                 return;
2057         }
2058
2059         /* if we got a response, then the recmode will be stored in the
2060            status field
2061         */
2062         if (state->status != CTDB_RECOVERY_NORMAL) {
2063                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2064                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2065         }
2066
2067         return;
2068 }
2069
2070
2071 /* verify that all nodes are in normal recovery mode */
2072 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2073 {
2074         struct verify_recmode_normal_data *rmdata;
2075         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2076         struct ctdb_client_control_state *state;
2077         enum monitor_result status;
2078         int j;
2079         
2080         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2081         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2082         rmdata->count  = 0;
2083         rmdata->status = MONITOR_OK;
2084
2085         /* loop over all active nodes and send an async getrecmode call to 
2086            them*/
2087         for (j=0; j<nodemap->num; j++) {
2088                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2089                         continue;
2090                 }
2091                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2092                                         CONTROL_TIMEOUT(), 
2093                                         nodemap->nodes[j].pnn);
2094                 if (state == NULL) {
2095                         /* we failed to send the control, treat this as 
2096                            an error and try again next iteration
2097                         */                      
2098                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2099                         talloc_free(mem_ctx);
2100                         return MONITOR_FAILED;
2101                 }
2102
2103                 /* set up the callback functions */
2104                 state->async.fn = verify_recmode_normal_callback;
2105                 state->async.private_data = rmdata;
2106
2107                 /* one more control to wait for to complete */
2108                 rmdata->count++;
2109         }
2110
2111
2112         /* now wait for up to the maximum number of seconds allowed
2113            or until all nodes we expect a response from has replied
2114         */
2115         while (rmdata->count > 0) {
2116                 event_loop_once(ctdb->ev);
2117         }
2118
2119         status = rmdata->status;
2120         talloc_free(mem_ctx);
2121         return status;
2122 }
2123
2124
2125 struct verify_recmaster_data {
2126         struct ctdb_recoverd *rec;
2127         uint32_t count;
2128         uint32_t pnn;
2129         enum monitor_result status;
2130 };
2131
2132 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2133 {
2134         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2135
2136
2137         /* one more node has responded with recmaster data*/
2138         rmdata->count--;
2139
2140         /* if we failed to get the recmaster, then return an error and let
2141            the main loop try again.
2142         */
2143         if (state->state != CTDB_CONTROL_DONE) {
2144                 if (rmdata->status == MONITOR_OK) {
2145                         rmdata->status = MONITOR_FAILED;
2146                 }
2147                 return;
2148         }
2149
2150         /* if we got a response, then the recmaster will be stored in the
2151            status field
2152         */
2153         if (state->status != rmdata->pnn) {
2154                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2155                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2156                 rmdata->status = MONITOR_ELECTION_NEEDED;
2157         }
2158
2159         return;
2160 }
2161
2162
2163 /* verify that all nodes agree that we are the recmaster */
2164 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2165 {
2166         struct ctdb_context *ctdb = rec->ctdb;
2167         struct verify_recmaster_data *rmdata;
2168         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2169         struct ctdb_client_control_state *state;
2170         enum monitor_result status;
2171         int j;
2172         
2173         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2174         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2175         rmdata->rec    = rec;
2176         rmdata->count  = 0;
2177         rmdata->pnn    = pnn;
2178         rmdata->status = MONITOR_OK;
2179
2180         /* loop over all active nodes and send an async getrecmaster call to 
2181            them*/
2182         for (j=0; j<nodemap->num; j++) {
2183                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2184                         continue;
2185                 }
2186                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2187                                         CONTROL_TIMEOUT(),
2188                                         nodemap->nodes[j].pnn);
2189                 if (state == NULL) {
2190                         /* we failed to send the control, treat this as 
2191                            an error and try again next iteration
2192                         */                      
2193                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2194                         talloc_free(mem_ctx);
2195                         return MONITOR_FAILED;
2196                 }
2197
2198                 /* set up the callback functions */
2199                 state->async.fn = verify_recmaster_callback;
2200                 state->async.private_data = rmdata;
2201
2202                 /* one more control to wait for to complete */
2203                 rmdata->count++;
2204         }
2205
2206
2207         /* now wait for up to the maximum number of seconds allowed
2208            or until all nodes we expect a response from has replied
2209         */
2210         while (rmdata->count > 0) {
2211                 event_loop_once(ctdb->ev);
2212         }
2213
2214         status = rmdata->status;
2215         talloc_free(mem_ctx);
2216         return status;
2217 }
2218
2219
2220 /* called to check that the allocation of public ip addresses is ok.
2221 */
2222 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2223 {
2224         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2225         struct ctdb_all_public_ips *ips = NULL;
2226         struct ctdb_uptime *uptime1 = NULL;
2227         struct ctdb_uptime *uptime2 = NULL;
2228         int ret, j;
2229
2230         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2231                                 CTDB_CURRENT_NODE, &uptime1);
2232         if (ret != 0) {
2233                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2234                 talloc_free(mem_ctx);
2235                 return -1;
2236         }
2237
2238         /* read the ip allocation from the local node */
2239         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2240         if (ret != 0) {
2241                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2242                 talloc_free(mem_ctx);
2243                 return -1;
2244         }
2245
2246         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2247                                 CTDB_CURRENT_NODE, &uptime2);
2248         if (ret != 0) {
2249                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2250                 talloc_free(mem_ctx);
2251                 return -1;
2252         }
2253
2254         /* skip the check if the startrecovery time has changed */
2255         if (timeval_compare(&uptime1->last_recovery_started,
2256                             &uptime2->last_recovery_started) != 0) {
2257                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2258                 talloc_free(mem_ctx);
2259                 return 0;
2260         }
2261
2262         /* skip the check if the endrecovery time has changed */
2263         if (timeval_compare(&uptime1->last_recovery_finished,
2264                             &uptime2->last_recovery_finished) != 0) {
2265                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2266                 talloc_free(mem_ctx);
2267                 return 0;
2268         }
2269
2270         /* skip the check if we have started but not finished recovery */
2271         if (timeval_compare(&uptime1->last_recovery_finished,
2272                             &uptime1->last_recovery_started) != 1) {
2273                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2274                 talloc_free(mem_ctx);
2275
2276                 return 0;
2277         }
2278
2279         /* verify that we have the ip addresses we should have
2280            and we dont have ones we shouldnt have.
2281            if we find an inconsistency we set recmode to
2282            active on the local node and wait for the recmaster
2283            to do a full blown recovery
2284         */
2285         for (j=0; j<ips->num; j++) {
2286                 if (ips->ips[j].pnn == pnn) {
2287                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2288                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2289                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2290                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2291                                 if (ret != 0) {
2292                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2293
2294                                         talloc_free(mem_ctx);
2295                                         return -1;
2296                                 }
2297                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2298                                 if (ret != 0) {
2299                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2300
2301                                         talloc_free(mem_ctx);
2302                                         return -1;
2303                                 }
2304                         }
2305                 } else {
2306                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2307                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2308                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2309
2310                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2311                                 if (ret != 0) {
2312                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2313
2314                                         talloc_free(mem_ctx);
2315                                         return -1;
2316                                 }
2317                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2318                                 if (ret != 0) {
2319                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2320
2321                                         talloc_free(mem_ctx);
2322                                         return -1;
2323                                 }
2324                         }
2325                 }
2326         }
2327
2328         talloc_free(mem_ctx);
2329         return 0;
2330 }
2331
2332
2333 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2334 {
2335         struct ctdb_node_map **remote_nodemaps = callback_data;
2336
2337         if (node_pnn >= ctdb->num_nodes) {
2338                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2339                 return;
2340         }
2341
2342         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2343
2344 }
2345
2346 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2347         struct ctdb_node_map *nodemap,
2348         struct ctdb_node_map **remote_nodemaps)
2349 {
2350         uint32_t *nodes;
2351
2352         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2353         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2354                                         nodes,
2355                                         CONTROL_TIMEOUT(), false, tdb_null,
2356                                         async_getnodemap_callback,
2357                                         NULL,
2358                                         remote_nodemaps) != 0) {
2359                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2360
2361                 return -1;
2362         }
2363
2364         return 0;
2365 }
2366
2367 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2368 struct ctdb_check_reclock_state {
2369         struct ctdb_context *ctdb;
2370         struct timeval start_time;
2371         int fd[2];
2372         pid_t child;
2373         struct timed_event *te;
2374         struct fd_event *fde;
2375         enum reclock_child_status status;
2376 };
2377
2378 /* when we free the reclock state we must kill any child process.
2379 */
2380 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2381 {
2382         struct ctdb_context *ctdb = state->ctdb;
2383
2384         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2385
2386         if (state->fd[0] != -1) {
2387                 close(state->fd[0]);
2388                 state->fd[0] = -1;
2389         }
2390         if (state->fd[1] != -1) {
2391                 close(state->fd[1]);
2392                 state->fd[1] = -1;
2393         }
2394         kill(state->child, SIGKILL);
2395         return 0;
2396 }
2397
2398 /*
2399   called if our check_reclock child times out. this would happen if
2400   i/o to the reclock file blocks.
2401  */
2402 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2403                                          struct timeval t, void *private_data)
2404 {
2405         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2406                                            struct ctdb_check_reclock_state);
2407
2408         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2409         state->status = RECLOCK_TIMEOUT;
2410 }
2411
2412 /* this is called when the child process has completed checking the reclock
2413    file and has written data back to us through the pipe.
2414 */
2415 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2416                              uint16_t flags, void *private_data)
2417 {
2418         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2419                                              struct ctdb_check_reclock_state);
2420         char c = 0;
2421         int ret;
2422
2423         /* we got a response from our child process so we can abort the
2424            timeout.
2425         */
2426         talloc_free(state->te);
2427         state->te = NULL;
2428
2429         ret = read(state->fd[0], &c, 1);
2430         if (ret != 1 || c != RECLOCK_OK) {
2431                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2432                 state->status = RECLOCK_FAILED;
2433
2434                 return;
2435         }
2436
2437         state->status = RECLOCK_OK;
2438         return;
2439 }
2440
2441 static int check_recovery_lock(struct ctdb_context *ctdb)
2442 {
2443         int ret;
2444         struct ctdb_check_reclock_state *state;
2445         pid_t parent = getpid();
2446
2447         if (ctdb->recovery_lock_fd == -1) {
2448                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2449                 return -1;
2450         }
2451
2452         state = talloc(ctdb, struct ctdb_check_reclock_state);
2453         CTDB_NO_MEMORY(ctdb, state);
2454
2455         state->ctdb = ctdb;
2456         state->start_time = timeval_current();
2457         state->status = RECLOCK_CHECKING;
2458         state->fd[0] = -1;
2459         state->fd[1] = -1;
2460
2461         ret = pipe(state->fd);
2462         if (ret != 0) {
2463                 talloc_free(state);
2464                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2465                 return -1;
2466         }
2467
2468         state->child = fork();
2469         if (state->child == (pid_t)-1) {
2470                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2471                 close(state->fd[0]);
2472                 state->fd[0] = -1;
2473                 close(state->fd[1]);
2474                 state->fd[1] = -1;
2475                 talloc_free(state);
2476                 return -1;
2477         }
2478
2479         if (state->child == 0) {
2480                 char cc = RECLOCK_OK;
2481                 close(state->fd[0]);
2482                 state->fd[0] = -1;
2483
2484                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2485                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2486                         cc = RECLOCK_FAILED;
2487                 }
2488
2489                 write(state->fd[1], &cc, 1);
2490                 /* make sure we die when our parent dies */
2491                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2492                         sleep(5);
2493                         write(state->fd[1], &cc, 1);
2494                 }
2495                 _exit(0);
2496         }
2497         close(state->fd[1]);
2498         state->fd[1] = -1;
2499
2500         talloc_set_destructor(state, check_reclock_destructor);
2501
2502         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2503                                     ctdb_check_reclock_timeout, state);
2504         if (state->te == NULL) {
2505                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2506                 talloc_free(state);
2507                 return -1;
2508         }
2509
2510         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2511                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2512                                 reclock_child_handler,
2513                                 (void *)state);
2514
2515         if (state->fde == NULL) {
2516                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2517                 talloc_free(state);
2518                 return -1;
2519         }
2520
2521         while (state->status == RECLOCK_CHECKING) {
2522                 event_loop_once(ctdb->ev);
2523         }
2524
2525         if (state->status == RECLOCK_FAILED) {
2526                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2527                 close(ctdb->recovery_lock_fd);
2528                 ctdb->recovery_lock_fd = -1;
2529                 talloc_free(state);
2530                 return -1;
2531         }
2532
2533         talloc_free(state);
2534         return 0;
2535 }
2536
2537 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2538 {
2539         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2540         const char *reclockfile;
2541
2542         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2543                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2544                 talloc_free(tmp_ctx);
2545                 return -1;      
2546         }
2547
2548         if (reclockfile == NULL) {
2549                 if (ctdb->recovery_lock_file != NULL) {
2550                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2551                         talloc_free(ctdb->recovery_lock_file);
2552                         ctdb->recovery_lock_file = NULL;
2553                         if (ctdb->recovery_lock_fd != -1) {
2554                                 close(ctdb->recovery_lock_fd);
2555                                 ctdb->recovery_lock_fd = -1;
2556                         }
2557                 }
2558                 ctdb->tunable.verify_recovery_lock = 0;
2559                 talloc_free(tmp_ctx);
2560                 return 0;
2561         }
2562
2563         if (ctdb->recovery_lock_file == NULL) {
2564                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2565                 if (ctdb->recovery_lock_fd != -1) {
2566                         close(ctdb->recovery_lock_fd);
2567                         ctdb->recovery_lock_fd = -1;
2568                 }
2569                 talloc_free(tmp_ctx);
2570                 return 0;
2571         }
2572
2573
2574         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2575                 talloc_free(tmp_ctx);
2576                 return 0;
2577         }
2578
2579         talloc_free(ctdb->recovery_lock_file);
2580         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2581         ctdb->tunable.verify_recovery_lock = 0;
2582         if (ctdb->recovery_lock_fd != -1) {
2583                 close(ctdb->recovery_lock_fd);
2584                 ctdb->recovery_lock_fd = -1;
2585         }
2586
2587         talloc_free(tmp_ctx);
2588         return 0;
2589 }
2590                 
2591 /*
2592   the main monitoring loop
2593  */
2594 static void monitor_cluster(struct ctdb_context *ctdb)
2595 {
2596         uint32_t pnn;
2597         TALLOC_CTX *mem_ctx=NULL;
2598         struct ctdb_node_map *nodemap=NULL;
2599         struct ctdb_node_map *recmaster_nodemap=NULL;
2600         struct ctdb_node_map **remote_nodemaps=NULL;
2601         struct ctdb_vnn_map *vnnmap=NULL;
2602         struct ctdb_vnn_map *remote_vnnmap=NULL;
2603         int32_t debug_level;
2604         int i, j, ret;
2605         struct ctdb_recoverd *rec;
2606
2607         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2608
2609         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2610         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2611
2612         rec->ctdb = ctdb;
2613
2614         rec->priority_time = timeval_current();
2615
2616         /* register a message port for sending memory dumps */
2617         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2618
2619         /* register a message port for recovery elections */
2620         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2621
2622         /* when nodes are disabled/enabled */
2623         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2624
2625         /* when we are asked to puch out a flag change */
2626         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2627
2628         /* register a message port for vacuum fetch */
2629         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2630
2631         /* register a message port for reloadnodes  */
2632         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2633
2634         /* register a message port for performing a takeover run */
2635         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2636
2637         /* register a message port for disabling the ip check for a short while */
2638         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2639
2640 again:
2641         if (mem_ctx) {
2642                 talloc_free(mem_ctx);
2643                 mem_ctx = NULL;
2644         }
2645         mem_ctx = talloc_new(ctdb);
2646         if (!mem_ctx) {
2647                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2648                 exit(-1);
2649         }
2650
2651         /* we only check for recovery once every second */
2652         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2653
2654         /* verify that the main daemon is still running */
2655         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2656                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2657                 exit(-1);
2658         }
2659
2660         /* ping the local daemon to tell it we are alive */
2661         ctdb_ctrl_recd_ping(ctdb);
2662
2663         if (rec->election_timeout) {
2664                 /* an election is in progress */
2665                 goto again;
2666         }
2667
2668         /* read the debug level from the parent and update locally */
2669         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2670         if (ret !=0) {
2671                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2672                 goto again;
2673         }
2674         LogLevel = debug_level;
2675
2676
2677         /* We must check if we need to ban a node here but we want to do this
2678            as early as possible so we dont wait until we have pulled the node
2679            map from the local node. thats why we have the hardcoded value 20
2680         */
2681         for (i=0; i<ctdb->num_nodes; i++) {
2682                 struct ctdb_banning_state *ban_state;
2683
2684                 if (ctdb->nodes[i]->ban_state == NULL) {
2685                         continue;
2686                 }
2687                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2688                 if (ban_state->count < 20) {
2689                         continue;
2690                 }
2691                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2692                         ctdb->nodes[i]->pnn, ban_state->count,
2693                         ctdb->tunable.recovery_ban_period));
2694                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2695                 ban_state->count = 0;
2696         }
2697
2698         /* get relevant tunables */
2699         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2700         if (ret != 0) {
2701                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2702                 goto again;
2703         }
2704
2705         /* get the current recovery lock file from the server */
2706         if (update_recovery_lock_file(ctdb) != 0) {
2707                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2708                 goto again;
2709         }
2710
2711         /* Make sure that if recovery lock verification becomes disabled when
2712            we close the file
2713         */
2714         if (ctdb->tunable.verify_recovery_lock == 0) {
2715                 if (ctdb->recovery_lock_fd != -1) {
2716                         close(ctdb->recovery_lock_fd);
2717                         ctdb->recovery_lock_fd = -1;
2718                 }
2719         }
2720
2721         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2722         if (pnn == (uint32_t)-1) {
2723                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2724                 goto again;
2725         }
2726
2727         /* get the vnnmap */
2728         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2729         if (ret != 0) {
2730                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2731                 goto again;
2732         }
2733
2734
2735         /* get number of nodes */
2736         if (rec->nodemap) {
2737                 talloc_free(rec->nodemap);
2738                 rec->nodemap = NULL;
2739                 nodemap=NULL;
2740         }
2741         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2742         if (ret != 0) {
2743                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2744                 goto again;
2745         }
2746         nodemap = rec->nodemap;
2747
2748         /* check which node is the recovery master */
2749         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2750         if (ret != 0) {
2751                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2752                 goto again;
2753         }
2754
2755         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2756         if (rec->recmaster != pnn) {
2757                 if (rec->ip_reallocate_ctx != NULL) {
2758                         talloc_free(rec->ip_reallocate_ctx);
2759                         rec->ip_reallocate_ctx = NULL;
2760                         rec->reallocate_callers = NULL;
2761                 }
2762         }
2763         /* if there are takeovers requested, perform it and notify the waiters */
2764         if (rec->reallocate_callers) {
2765                 process_ipreallocate_requests(ctdb, rec);
2766         }
2767
2768         if (rec->recmaster == (uint32_t)-1) {
2769                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2770                 force_election(rec, pnn, nodemap);
2771                 goto again;
2772         }
2773
2774
2775         /* if the local daemon is STOPPED, we verify that the databases are
2776            also frozen and thet the recmode is set to active 
2777         */
2778         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2779                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2780                 if (ret != 0) {
2781                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2782                 }
2783                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2784                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2785
2786                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2787                         if (ret != 0) {
2788                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2789                                 goto again;
2790                         }
2791                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2792                         if (ret != 0) {
2793                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2794
2795                                 goto again;
2796                         }
2797                         goto again;
2798                 }
2799         }
2800         /* If the local node is stopped, verify we are not the recmaster 
2801            and yield this role if so
2802         */
2803         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2804                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2805                 force_election(rec, pnn, nodemap);
2806                 goto again;
2807         }
2808         
2809         /* check that we (recovery daemon) and the local ctdb daemon
2810            agrees on whether we are banned or not
2811         */
2812 //qqq
2813
2814         /* remember our own node flags */
2815         rec->node_flags = nodemap->nodes[pnn].flags;
2816
2817         /* count how many active nodes there are */
2818         rec->num_active    = 0;
2819         rec->num_connected = 0;
2820         for (i=0; i<nodemap->num; i++) {
2821                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2822                         rec->num_active++;
2823                 }
2824                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2825                         rec->num_connected++;
2826                 }
2827         }
2828
2829
2830         /* verify that the recmaster node is still active */
2831         for (j=0; j<nodemap->num; j++) {
2832                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2833                         break;
2834                 }
2835         }
2836
2837         if (j == nodemap->num) {
2838                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2839                 force_election(rec, pnn, nodemap);
2840                 goto again;
2841         }
2842
2843         /* if recovery master is disconnected we must elect a new recmaster */
2844         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2845                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2846                 force_election(rec, pnn, nodemap);
2847                 goto again;
2848         }
2849
2850         /* grap the nodemap from the recovery master to check if it is banned */
2851         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2852                                    mem_ctx, &recmaster_nodemap);
2853         if (ret != 0) {
2854                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2855                           nodemap->nodes[j].pnn));
2856                 goto again;
2857         }
2858
2859
2860         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2861                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2862                 force_election(rec, pnn, nodemap);
2863                 goto again;
2864         }
2865
2866
2867         /* verify that we have all ip addresses we should have and we dont
2868          * have addresses we shouldnt have.
2869          */ 
2870         if (ctdb->do_checkpublicip) {
2871                 if (rec->ip_check_disable_ctx == NULL) {
2872                         if (verify_ip_allocation(ctdb, pnn) != 0) {
2873                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2874                                 goto again;
2875                         }
2876                 }
2877         }
2878
2879
2880         /* if we are not the recmaster then we do not need to check
2881            if recovery is needed
2882          */
2883         if (pnn != rec->recmaster) {
2884                 goto again;
2885         }
2886
2887
2888         /* ensure our local copies of flags are right */
2889         ret = update_local_flags(rec, nodemap);
2890         if (ret == MONITOR_ELECTION_NEEDED) {
2891                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2892                 force_election(rec, pnn, nodemap);
2893                 goto again;
2894         }
2895         if (ret != MONITOR_OK) {
2896                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2897                 goto again;
2898         }
2899
2900         /* update the list of public ips that a node can handle for
2901            all connected nodes
2902         */
2903         if (ctdb->num_nodes != nodemap->num) {
2904                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2905                 reload_nodes_file(ctdb);
2906                 goto again;
2907         }
2908         for (j=0; j<nodemap->num; j++) {
2909                 /* release any existing data */
2910                 if (ctdb->nodes[j]->public_ips) {
2911                         talloc_free(ctdb->nodes[j]->public_ips);
2912                         ctdb->nodes[j]->public_ips = NULL;
2913                 }
2914
2915                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2916                         continue;
2917                 }
2918
2919                 /* grab a new shiny list of public ips from the node */
2920                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2921                         ctdb->nodes[j]->pnn, 
2922                         ctdb->nodes,
2923                         &ctdb->nodes[j]->public_ips)) {
2924                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2925                                 ctdb->nodes[j]->pnn));
2926                         goto again;
2927                 }
2928         }
2929
2930
2931         /* verify that all active nodes agree that we are the recmaster */
2932         switch (verify_recmaster(rec, nodemap, pnn)) {
2933         case MONITOR_RECOVERY_NEEDED:
2934                 /* can not happen */
2935                 goto again;
2936         case MONITOR_ELECTION_NEEDED:
2937                 force_election(rec, pnn, nodemap);
2938                 goto again;
2939         case MONITOR_OK:
2940                 break;
2941         case MONITOR_FAILED:
2942                 goto again;
2943         }
2944
2945
2946         if (rec->need_recovery) {
2947                 /* a previous recovery didn't finish */
2948                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2949                 goto again;             
2950         }
2951
2952         /* verify that all active nodes are in normal mode 
2953            and not in recovery mode 
2954         */
2955         switch (verify_recmode(ctdb, nodemap)) {
2956         case MONITOR_RECOVERY_NEEDED:
2957                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2958                 goto again;
2959         case MONITOR_FAILED:
2960                 goto again;
2961         case MONITOR_ELECTION_NEEDED:
2962                 /* can not happen */
2963         case MONITOR_OK:
2964                 break;
2965         }
2966
2967
2968         if (ctdb->tunable.verify_recovery_lock != 0) {
2969                 /* we should have the reclock - check its not stale */
2970                 ret = check_recovery_lock(ctdb);
2971                 if (ret != 0) {
2972                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
2973                         ctdb_set_culprit(rec, ctdb->pnn);
2974                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2975                         goto again;
2976                 }
2977         }
2978
2979         /* get the nodemap for all active remote nodes
2980          */
2981         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2982         if (remote_nodemaps == NULL) {
2983                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2984                 goto again;
2985         }
2986         for(i=0; i<nodemap->num; i++) {
2987                 remote_nodemaps[i] = NULL;
2988         }
2989         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2990                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2991                 goto again;
2992         } 
2993
2994         /* verify that all other nodes have the same nodemap as we have
2995         */
2996         for (j=0; j<nodemap->num; j++) {
2997                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2998                         continue;
2999                 }
3000
3001                 if (remote_nodemaps[j] == NULL) {
3002                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3003                         ctdb_set_culprit(rec, j);
3004
3005                         goto again;
3006                 }
3007
3008                 /* if the nodes disagree on how many nodes there are
3009                    then this is a good reason to try recovery
3010                  */
3011                 if (remote_nodemaps[j]->num != nodemap->num) {
3012                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3013                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3014                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3015                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3016                         goto again;
3017                 }
3018
3019                 /* if the nodes disagree on which nodes exist and are
3020                    active, then that is also a good reason to do recovery
3021                  */
3022                 for (i=0;i<nodemap->num;i++) {
3023                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3024                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3025                                           nodemap->nodes[j].pnn, i, 
3026                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3027                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3028                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3029                                             vnnmap);
3030                                 goto again;
3031                         }
3032                 }
3033
3034                 /* verify the flags are consistent
3035                 */
3036                 for (i=0; i<nodemap->num; i++) {
3037                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3038                                 continue;
3039                         }
3040                         
3041                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3042                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3043                                   nodemap->nodes[j].pnn, 
3044                                   nodemap->nodes[i].pnn, 
3045                                   remote_nodemaps[j]->nodes[i].flags,
3046                                   nodemap->nodes[j].flags));
3047                                 if (i == j) {
3048                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3049                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3050                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3051                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3052                                                     vnnmap);
3053                                         goto again;
3054                                 } else {
3055                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3056                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3057                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3058                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3059                                                     vnnmap);
3060                                         goto again;
3061                                 }
3062                         }
3063                 }
3064         }
3065
3066
3067         /* there better be the same number of lmasters in the vnn map
3068            as there are active nodes or we will have to do a recovery
3069          */
3070         if (vnnmap->size != rec->num_active) {
3071                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3072                           vnnmap->size, rec->num_active));
3073                 ctdb_set_culprit(rec, ctdb->pnn);
3074                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3075                 goto again;
3076         }
3077
3078         /* verify that all active nodes in the nodemap also exist in 
3079            the vnnmap.
3080          */
3081         for (j=0; j<nodemap->num; j++) {
3082                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3083                         continue;
3084                 }
3085                 if (nodemap->nodes[j].pnn == pnn) {
3086                         continue;
3087                 }
3088
3089                 for (i=0; i<vnnmap->size; i++) {
3090                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3091                                 break;
3092                         }
3093                 }
3094                 if (i == vnnmap->size) {
3095                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3096                                   nodemap->nodes[j].pnn));
3097                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3098                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3099                         goto again;
3100                 }
3101         }
3102
3103         
3104         /* verify that all other nodes have the same vnnmap
3105            and are from the same generation
3106          */
3107         for (j=0; j<nodemap->num; j++) {
3108                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3109                         continue;
3110                 }
3111                 if (nodemap->nodes[j].pnn == pnn) {
3112                         continue;
3113                 }
3114
3115                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3116                                           mem_ctx, &remote_vnnmap);
3117                 if (ret != 0) {
3118                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3119                                   nodemap->nodes[j].pnn));
3120                         goto again;
3121                 }
3122
3123                 /* verify the vnnmap generation is the same */
3124                 if (vnnmap->generation != remote_vnnmap->generation) {
3125                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3126                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3127                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3128                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3129                         goto again;
3130                 }
3131
3132                 /* verify the vnnmap size is the same */
3133                 if (vnnmap->size != remote_vnnmap->size) {
3134                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3135                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3136                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3137                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3138                         goto again;
3139                 }
3140
3141                 /* verify the vnnmap is the same */
3142                 for (i=0;i<vnnmap->size;i++) {
3143                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3144                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3145                                           nodemap->nodes[j].pnn));
3146                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3147                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3148                                             vnnmap);
3149                                 goto again;
3150                         }
3151                 }
3152         }
3153
3154         /* we might need to change who has what IP assigned */
3155         if (rec->need_takeover_run) {
3156                 rec->need_takeover_run = false;
3157
3158                 /* execute the "startrecovery" event script on all nodes */
3159                 ret = run_startrecovery_eventscript(rec, nodemap);
3160                 if (ret!=0) {
3161                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3162                         ctdb_set_culprit(rec, ctdb->pnn);
3163                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3164                 }
3165
3166                 ret = ctdb_takeover_run(ctdb, nodemap);
3167                 if (ret != 0) {
3168                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3169                         ctdb_set_culprit(rec, ctdb->pnn);
3170                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3171                 }
3172
3173                 /* execute the "recovered" event script on all nodes */
3174                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3175 #if 0
3176 // we cant check whether the event completed successfully
3177 // since this script WILL fail if the node is in recovery mode
3178 // and if that race happens, the code here would just cause a second
3179 // cascading recovery.
3180                 if (ret!=0) {
3181                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3182                         ctdb_set_culprit(rec, ctdb->pnn);
3183                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3184                 }
3185 #endif
3186         }
3187
3188
3189         goto again;
3190
3191 }
3192
3193 /*
3194   event handler for when the main ctdbd dies
3195  */
3196 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3197                                  uint16_t flags, void *private_data)
3198 {
3199         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3200         _exit(1);
3201 }
3202
3203 /*
3204   called regularly to verify that the recovery daemon is still running
3205  */
3206 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3207                               struct timeval yt, void *p)
3208 {
3209         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3210
3211         if (kill(ctdb->recoverd_pid, 0) != 0) {
3212                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3213
3214                 ctdb_stop_recoverd(ctdb);
3215                 ctdb_stop_keepalive(ctdb);
3216                 ctdb_stop_monitoring(ctdb);
3217                 ctdb_release_all_ips(ctdb);
3218                 if (ctdb->methods != NULL) {
3219                         ctdb->methods->shutdown(ctdb);
3220                 }
3221                 ctdb_event_script(ctdb, "shutdown");
3222
3223                 exit(10);       
3224         }
3225
3226         event_add_timed(ctdb->ev, ctdb, 
3227                         timeval_current_ofs(30, 0),
3228                         ctdb_check_recd, ctdb);
3229 }
3230
3231 static void recd_sig_child_handler(struct event_context *ev,
3232         struct signal_event *se, int signum, int count,
3233         void *dont_care, 
3234         void *private_data)
3235 {
3236 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3237         int status;
3238         pid_t pid = -1;
3239
3240         while (pid != 0) {
3241                 pid = waitpid(-1, &status, WNOHANG);
3242                 if (pid == -1) {
3243                         if (errno != ECHILD) {
3244                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3245                         }
3246                         return;
3247                 }
3248                 if (pid > 0) {
3249                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3250                 }
3251         }
3252 }
3253
3254 /*
3255   startup the recovery daemon as a child of the main ctdb daemon
3256  */
3257 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3258 {
3259         int fd[2];
3260         struct signal_event *se;
3261
3262         if (pipe(fd) != 0) {
3263                 return -1;
3264         }
3265
3266         ctdb->ctdbd_pid = getpid();
3267
3268         ctdb->recoverd_pid = fork();
3269         if (ctdb->recoverd_pid == -1) {
3270                 return -1;
3271         }
3272         
3273         if (ctdb->recoverd_pid != 0) {
3274                 close(fd[0]);
3275                 event_add_timed(ctdb->ev, ctdb, 
3276                                 timeval_current_ofs(30, 0),
3277                                 ctdb_check_recd, ctdb);
3278                 return 0;
3279         }
3280
3281         close(fd[1]);
3282
3283         srandom(getpid() ^ time(NULL));
3284
3285         if (switch_from_server_to_client(ctdb) != 0) {
3286                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3287                 exit(1);
3288         }
3289
3290         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3291                      ctdb_recoverd_parent, &fd[0]);     
3292
3293         /* set up a handler to pick up sigchld */
3294         se = event_add_signal(ctdb->ev, ctdb,
3295                                      SIGCHLD, 0,
3296                                      recd_sig_child_handler,
3297                                      ctdb);
3298         if (se == NULL) {
3299                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3300                 exit(1);
3301         }
3302
3303         monitor_cluster(ctdb);
3304
3305         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3306         return -1;
3307 }
3308
3309 /*
3310   shutdown the recovery daemon
3311  */
3312 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3313 {
3314         if (ctdb->recoverd_pid == 0) {
3315                 return;
3316         }
3317
3318         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3319         kill(ctdb->recoverd_pid, SIGTERM);
3320 }