4df6ce0386aec59b30a9824f24f7df7e6a5139fc
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
891 {
892         uint32_t timed_out = 0;
893         time_t usecs = (secs - (time_t)secs) * 1000000;
894         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
895         while (!timed_out) {
896                 event_loop_once(ctdb->ev);
897         }
898 }
899
900 /*
901   called when an election times out (ends)
902  */
903 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
904                                   struct timeval t, void *p)
905 {
906         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
907         rec->election_timeout = NULL;
908         fast_start = false;
909
910         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
911 }
912
913
914 /*
915   wait for an election to finish. It finished election_timeout seconds after
916   the last election packet is received
917  */
918 static void ctdb_wait_election(struct ctdb_recoverd *rec)
919 {
920         struct ctdb_context *ctdb = rec->ctdb;
921         while (rec->election_timeout) {
922                 event_loop_once(ctdb->ev);
923         }
924 }
925
926 /*
927   Update our local flags from all remote connected nodes. 
928   This is only run when we are or we belive we are the recovery master
929  */
930 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
931 {
932         int j;
933         struct ctdb_context *ctdb = rec->ctdb;
934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
935
936         /* get the nodemap for all active remote nodes and verify
937            they are the same as for this node
938          */
939         for (j=0; j<nodemap->num; j++) {
940                 struct ctdb_node_map *remote_nodemap=NULL;
941                 int ret;
942
943                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
944                         continue;
945                 }
946                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
947                         continue;
948                 }
949
950                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
951                                            mem_ctx, &remote_nodemap);
952                 if (ret != 0) {
953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
954                                   nodemap->nodes[j].pnn));
955                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
956                         talloc_free(mem_ctx);
957                         return MONITOR_FAILED;
958                 }
959                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
967                         if (ret != 0) {
968                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
969                                 return -1;
970                         }
971
972                         /* Update our local copy of the flags in the recovery
973                            daemon.
974                         */
975                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
976                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
977                                  nodemap->nodes[j].flags));
978                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
979                 }
980                 talloc_free(remote_nodemap);
981         }
982         talloc_free(mem_ctx);
983         return MONITOR_OK;
984 }
985
986
987 /* Create a new random generation ip. 
988    The generation id can not be the INVALID_GENERATION id
989 */
990 static uint32_t new_generation(void)
991 {
992         uint32_t generation;
993
994         while (1) {
995                 generation = random();
996
997                 if (generation != INVALID_GENERATION) {
998                         break;
999                 }
1000         }
1001
1002         return generation;
1003 }
1004
1005
1006 /*
1007   create a temporary working database
1008  */
1009 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1010 {
1011         char *name;
1012         struct tdb_wrap *recdb;
1013         unsigned tdb_flags;
1014
1015         /* open up the temporary recovery database */
1016         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1017                                ctdb->db_directory_state,
1018                                ctdb->pnn);
1019         if (name == NULL) {
1020                 return NULL;
1021         }
1022         unlink(name);
1023
1024         tdb_flags = TDB_NOLOCK;
1025         if (ctdb->valgrinding) {
1026                 tdb_flags |= TDB_NOMMAP;
1027         }
1028         tdb_flags |= TDB_DISALLOW_NESTING;
1029
1030         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1031                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1032         if (recdb == NULL) {
1033                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1034         }
1035
1036         talloc_free(name);
1037
1038         return recdb;
1039 }
1040
1041
1042 /* 
1043    a traverse function for pulling all relevent records from recdb
1044  */
1045 struct recdb_data {
1046         struct ctdb_context *ctdb;
1047         struct ctdb_marshall_buffer *recdata;
1048         uint32_t len;
1049         bool failed;
1050         bool persistent;
1051 };
1052
1053 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1054 {
1055         struct recdb_data *params = (struct recdb_data *)p;
1056         struct ctdb_rec_data *rec;
1057         struct ctdb_ltdb_header *hdr;
1058
1059         /* skip empty records */
1060         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1061                 return 0;
1062         }
1063
1064         /* update the dmaster field to point to us */
1065         hdr = (struct ctdb_ltdb_header *)data.dptr;
1066         if (!params->persistent) {
1067                 hdr->dmaster = params->ctdb->pnn;
1068         }
1069
1070         /* add the record to the blob ready to send to the nodes */
1071         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1072         if (rec == NULL) {
1073                 params->failed = true;
1074                 return -1;
1075         }
1076         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1077         if (params->recdata == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1079                          rec->length + params->len, params->recdata->count));
1080                 params->failed = true;
1081                 return -1;
1082         }
1083         params->recdata->count++;
1084         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1085         params->len += rec->length;
1086         talloc_free(rec);
1087
1088         return 0;
1089 }
1090
1091 /*
1092   push the recdb database out to all nodes
1093  */
1094 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1095                                bool persistent,
1096                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1097 {
1098         struct recdb_data params;
1099         struct ctdb_marshall_buffer *recdata;
1100         TDB_DATA outdata;
1101         TALLOC_CTX *tmp_ctx;
1102         uint32_t *nodes;
1103
1104         tmp_ctx = talloc_new(ctdb);
1105         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1106
1107         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1108         CTDB_NO_MEMORY(ctdb, recdata);
1109
1110         recdata->db_id = dbid;
1111
1112         params.ctdb = ctdb;
1113         params.recdata = recdata;
1114         params.len = offsetof(struct ctdb_marshall_buffer, data);
1115         params.failed = false;
1116         params.persistent = persistent;
1117
1118         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1120                 talloc_free(params.recdata);
1121                 talloc_free(tmp_ctx);
1122                 return -1;
1123         }
1124
1125         if (params.failed) {
1126                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1127                 talloc_free(params.recdata);
1128                 talloc_free(tmp_ctx);
1129                 return -1;              
1130         }
1131
1132         recdata = params.recdata;
1133
1134         outdata.dptr = (void *)recdata;
1135         outdata.dsize = params.len;
1136
1137         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                                         nodes, 0,
1140                                         CONTROL_TIMEOUT(), false, outdata,
1141                                         NULL, NULL,
1142                                         NULL) != 0) {
1143                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1144                 talloc_free(recdata);
1145                 talloc_free(tmp_ctx);
1146                 return -1;
1147         }
1148
1149         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1150                   dbid, recdata->count));
1151
1152         talloc_free(recdata);
1153         talloc_free(tmp_ctx);
1154
1155         return 0;
1156 }
1157
1158
1159 /*
1160   go through a full recovery on one database 
1161  */
1162 static int recover_database(struct ctdb_recoverd *rec, 
1163                             TALLOC_CTX *mem_ctx,
1164                             uint32_t dbid,
1165                             bool persistent,
1166                             uint32_t pnn, 
1167                             struct ctdb_node_map *nodemap,
1168                             uint32_t transaction_id)
1169 {
1170         struct tdb_wrap *recdb;
1171         int ret;
1172         struct ctdb_context *ctdb = rec->ctdb;
1173         TDB_DATA data;
1174         struct ctdb_control_wipe_database w;
1175         uint32_t *nodes;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1200                                         nodes, 0,
1201                                         CONTROL_TIMEOUT(), false, data,
1202                                         NULL, NULL,
1203                                         NULL) != 0) {
1204                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1205                 talloc_free(recdb);
1206                 return -1;
1207         }
1208         
1209         /* push out the correct database. This sets the dmaster and skips 
1210            the empty records */
1211         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1212         if (ret != 0) {
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216
1217         /* all done with this database */
1218         talloc_free(recdb);
1219
1220         return 0;
1221 }
1222
1223 /*
1224   reload the nodes file 
1225 */
1226 static void reload_nodes_file(struct ctdb_context *ctdb)
1227 {
1228         ctdb->nodes = NULL;
1229         ctdb_load_nodes_file(ctdb);
1230 }
1231
1232 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1233                                          struct ctdb_recoverd *rec,
1234                                          struct ctdb_node_map *nodemap,
1235                                          uint32_t *culprit)
1236 {
1237         int j;
1238         int ret;
1239
1240         if (ctdb->num_nodes != nodemap->num) {
1241                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1242                                   ctdb->num_nodes, nodemap->num));
1243                 if (culprit) {
1244                         *culprit = ctdb->pnn;
1245                 }
1246                 return -1;
1247         }
1248
1249         for (j=0; j<nodemap->num; j++) {
1250                 /* release any existing data */
1251                 if (ctdb->nodes[j]->known_public_ips) {
1252                         talloc_free(ctdb->nodes[j]->known_public_ips);
1253                         ctdb->nodes[j]->known_public_ips = NULL;
1254                 }
1255                 if (ctdb->nodes[j]->available_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->available_public_ips);
1257                         ctdb->nodes[j]->available_public_ips = NULL;
1258                 }
1259
1260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1261                         continue;
1262                 }
1263
1264                 /* grab a new shiny list of public ips from the node */
1265                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1266                                         CONTROL_TIMEOUT(),
1267                                         ctdb->nodes[j]->pnn,
1268                                         ctdb->nodes,
1269                                         0,
1270                                         &ctdb->nodes[j]->known_public_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1273                                 ctdb->nodes[j]->pnn));
1274                         if (culprit) {
1275                                 *culprit = ctdb->nodes[j]->pnn;
1276                         }
1277                         return -1;
1278                 }
1279
1280                 if (rec->ip_check_disable_ctx == NULL) {
1281                         if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1282                                 DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1283                                 rec->need_takeover_run = true;
1284                         }
1285                 }
1286
1287                 /* grab a new shiny list of public ips from the node */
1288                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1289                                         CONTROL_TIMEOUT(),
1290                                         ctdb->nodes[j]->pnn,
1291                                         ctdb->nodes,
1292                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1293                                         &ctdb->nodes[j]->available_public_ips);
1294                 if (ret != 0) {
1295                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1296                                 ctdb->nodes[j]->pnn));
1297                         if (culprit) {
1298                                 *culprit = ctdb->nodes[j]->pnn;
1299                         }
1300                         return -1;
1301                 }
1302         }
1303
1304         return 0;
1305 }
1306
1307 /* when we start a recovery, make sure all nodes use the same reclock file
1308    setting
1309 */
1310 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1311 {
1312         struct ctdb_context *ctdb = rec->ctdb;
1313         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1314         TDB_DATA data;
1315         uint32_t *nodes;
1316
1317         if (ctdb->recovery_lock_file == NULL) {
1318                 data.dptr  = NULL;
1319                 data.dsize = 0;
1320         } else {
1321                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1322                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1323         }
1324
1325         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1326         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1327                                         nodes, 0,
1328                                         CONTROL_TIMEOUT(),
1329                                         false, data,
1330                                         NULL, NULL,
1331                                         rec) != 0) {
1332                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1333                 talloc_free(tmp_ctx);
1334                 return -1;
1335         }
1336
1337         talloc_free(tmp_ctx);
1338         return 0;
1339 }
1340
1341
1342 /*
1343   we are the recmaster, and recovery is needed - start a recovery run
1344  */
1345 static int do_recovery(struct ctdb_recoverd *rec, 
1346                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1347                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1348 {
1349         struct ctdb_context *ctdb = rec->ctdb;
1350         int i, j, ret;
1351         uint32_t generation;
1352         struct ctdb_dbid_map *dbmap;
1353         TDB_DATA data;
1354         uint32_t *nodes;
1355         struct timeval start_time;
1356         uint32_t culprit = (uint32_t)-1;
1357
1358         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1359
1360         /* if recovery fails, force it again */
1361         rec->need_recovery = true;
1362
1363         for (i=0; i<ctdb->num_nodes; i++) {
1364                 struct ctdb_banning_state *ban_state;
1365
1366                 if (ctdb->nodes[i]->ban_state == NULL) {
1367                         continue;
1368                 }
1369                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1370                 if (ban_state->count < 2*ctdb->num_nodes) {
1371                         continue;
1372                 }
1373                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1374                         ctdb->nodes[i]->pnn, ban_state->count,
1375                         ctdb->tunable.recovery_ban_period));
1376                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1377                 ban_state->count = 0;
1378         }
1379
1380
1381         if (ctdb->tunable.verify_recovery_lock != 0) {
1382                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1383                 start_time = timeval_current();
1384                 if (!ctdb_recovery_lock(ctdb, true)) {
1385                         ctdb_set_culprit(rec, pnn);
1386                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1387                         return -1;
1388                 }
1389                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1390                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1391         }
1392
1393         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1394
1395         /* get a list of all databases */
1396         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1397         if (ret != 0) {
1398                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1399                 return -1;
1400         }
1401
1402         /* we do the db creation before we set the recovery mode, so the freeze happens
1403            on all databases we will be dealing with. */
1404
1405         /* verify that we have all the databases any other node has */
1406         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1407         if (ret != 0) {
1408                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1409                 return -1;
1410         }
1411
1412         /* verify that all other nodes have all our databases */
1413         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1414         if (ret != 0) {
1415                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1416                 return -1;
1417         }
1418         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1419
1420         /* update the database priority for all remote databases */
1421         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1422         if (ret != 0) {
1423                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1424         }
1425         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1426
1427
1428         /* update all other nodes to use the same setting for reclock files
1429            as the local recovery master.
1430         */
1431         sync_recovery_lock_file_across_cluster(rec);
1432
1433         /* set recovery mode to active on all nodes */
1434         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1437                 return -1;
1438         }
1439
1440         /* execute the "startrecovery" event script on all nodes */
1441         ret = run_startrecovery_eventscript(rec, nodemap);
1442         if (ret!=0) {
1443                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1444                 return -1;
1445         }
1446
1447         /*
1448           update all nodes to have the same flags that we have
1449          */
1450         for (i=0;i<nodemap->num;i++) {
1451                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1452                         continue;
1453                 }
1454
1455                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1456                 if (ret != 0) {
1457                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1458                         return -1;
1459                 }
1460         }
1461
1462         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1463
1464         /* pick a new generation number */
1465         generation = new_generation();
1466
1467         /* change the vnnmap on this node to use the new generation 
1468            number but not on any other nodes.
1469            this guarantees that if we abort the recovery prematurely
1470            for some reason (a node stops responding?)
1471            that we can just return immediately and we will reenter
1472            recovery shortly again.
1473            I.e. we deliberately leave the cluster with an inconsistent
1474            generation id to allow us to abort recovery at any stage and
1475            just restart it from scratch.
1476          */
1477         vnnmap->generation = generation;
1478         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1479         if (ret != 0) {
1480                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1481                 return -1;
1482         }
1483
1484         data.dptr = (void *)&generation;
1485         data.dsize = sizeof(uint32_t);
1486
1487         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1488         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1489                                         nodes, 0,
1490                                         CONTROL_TIMEOUT(), false, data,
1491                                         NULL,
1492                                         transaction_start_fail_callback,
1493                                         rec) != 0) {
1494                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1495                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1496                                         nodes, 0,
1497                                         CONTROL_TIMEOUT(), false, tdb_null,
1498                                         NULL,
1499                                         NULL,
1500                                         NULL) != 0) {
1501                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1502                 }
1503                 return -1;
1504         }
1505
1506         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1507
1508         for (i=0;i<dbmap->num;i++) {
1509                 ret = recover_database(rec, mem_ctx,
1510                                        dbmap->dbs[i].dbid,
1511                                        dbmap->dbs[i].persistent,
1512                                        pnn, nodemap, generation);
1513                 if (ret != 0) {
1514                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1515                         return -1;
1516                 }
1517         }
1518
1519         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1520
1521         /* commit all the changes */
1522         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1523                                         nodes, 0,
1524                                         CONTROL_TIMEOUT(), false, data,
1525                                         NULL, NULL,
1526                                         NULL) != 0) {
1527                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1528                 return -1;
1529         }
1530
1531         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1532         
1533
1534         /* update the capabilities for all nodes */
1535         ret = update_capabilities(ctdb, nodemap);
1536         if (ret!=0) {
1537                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1538                 return -1;
1539         }
1540
1541         /* build a new vnn map with all the currently active and
1542            unbanned nodes */
1543         generation = new_generation();
1544         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1545         CTDB_NO_MEMORY(ctdb, vnnmap);
1546         vnnmap->generation = generation;
1547         vnnmap->size = 0;
1548         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1549         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1550         for (i=j=0;i<nodemap->num;i++) {
1551                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1552                         continue;
1553                 }
1554                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1555                         /* this node can not be an lmaster */
1556                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1557                         continue;
1558                 }
1559
1560                 vnnmap->size++;
1561                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1562                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1563                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1564
1565         }
1566         if (vnnmap->size == 0) {
1567                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1568                 vnnmap->size++;
1569                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1570                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1571                 vnnmap->map[0] = pnn;
1572         }       
1573
1574         /* update to the new vnnmap on all nodes */
1575         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1576         if (ret != 0) {
1577                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1578                 return -1;
1579         }
1580
1581         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1582
1583         /* update recmaster to point to us for all nodes */
1584         ret = set_recovery_master(ctdb, nodemap, pnn);
1585         if (ret!=0) {
1586                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1587                 return -1;
1588         }
1589
1590         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1591
1592         /*
1593           update all nodes to have the same flags that we have
1594          */
1595         for (i=0;i<nodemap->num;i++) {
1596                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1597                         continue;
1598                 }
1599
1600                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1601                 if (ret != 0) {
1602                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1603                         return -1;
1604                 }
1605         }
1606
1607         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1608
1609         /* disable recovery mode */
1610         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1611         if (ret != 0) {
1612                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1613                 return -1;
1614         }
1615
1616         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1617
1618         /*
1619           tell nodes to takeover their public IPs
1620          */
1621         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1622         if (ret != 0) {
1623                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1624                                  culprit));
1625                 return -1;
1626         }
1627         rec->need_takeover_run = false;
1628         ret = ctdb_takeover_run(ctdb, nodemap);
1629         if (ret != 0) {
1630                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1631                 return -1;
1632         }
1633         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1634
1635         /* execute the "recovered" event script on all nodes */
1636         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1637         if (ret!=0) {
1638                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1639                 return -1;
1640         }
1641
1642         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1643
1644         /* send a message to all clients telling them that the cluster 
1645            has been reconfigured */
1646         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1647
1648         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1649
1650         rec->need_recovery = false;
1651
1652         /* we managed to complete a full recovery, make sure to forgive
1653            any past sins by the nodes that could now participate in the
1654            recovery.
1655         */
1656         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1657         for (i=0;i<nodemap->num;i++) {
1658                 struct ctdb_banning_state *ban_state;
1659
1660                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1661                         continue;
1662                 }
1663
1664                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1665                 if (ban_state == NULL) {
1666                         continue;
1667                 }
1668
1669                 ban_state->count = 0;
1670         }
1671
1672
1673         /* We just finished a recovery successfully. 
1674            We now wait for rerecovery_timeout before we allow 
1675            another recovery to take place.
1676         */
1677         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1678         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1679         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1680
1681         return 0;
1682 }
1683
1684
1685 /*
1686   elections are won by first checking the number of connected nodes, then
1687   the priority time, then the pnn
1688  */
1689 struct election_message {
1690         uint32_t num_connected;
1691         struct timeval priority_time;
1692         uint32_t pnn;
1693         uint32_t node_flags;
1694 };
1695
1696 /*
1697   form this nodes election data
1698  */
1699 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1700 {
1701         int ret, i;
1702         struct ctdb_node_map *nodemap;
1703         struct ctdb_context *ctdb = rec->ctdb;
1704
1705         ZERO_STRUCTP(em);
1706
1707         em->pnn = rec->ctdb->pnn;
1708         em->priority_time = rec->priority_time;
1709
1710         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1711         if (ret != 0) {
1712                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1713                 return;
1714         }
1715
1716         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1717         em->node_flags = rec->node_flags;
1718
1719         for (i=0;i<nodemap->num;i++) {
1720                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1721                         em->num_connected++;
1722                 }
1723         }
1724
1725         /* we shouldnt try to win this election if we cant be a recmaster */
1726         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1727                 em->num_connected = 0;
1728                 em->priority_time = timeval_current();
1729         }
1730
1731         talloc_free(nodemap);
1732 }
1733
1734 /*
1735   see if the given election data wins
1736  */
1737 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1738 {
1739         struct election_message myem;
1740         int cmp = 0;
1741
1742         ctdb_election_data(rec, &myem);
1743
1744         /* we cant win if we dont have the recmaster capability */
1745         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1746                 return false;
1747         }
1748
1749         /* we cant win if we are banned */
1750         if (rec->node_flags & NODE_FLAGS_BANNED) {
1751                 return false;
1752         }       
1753
1754         /* we cant win if we are stopped */
1755         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1756                 return false;
1757         }       
1758
1759         /* we will automatically win if the other node is banned */
1760         if (em->node_flags & NODE_FLAGS_BANNED) {
1761                 return true;
1762         }
1763
1764         /* we will automatically win if the other node is banned */
1765         if (em->node_flags & NODE_FLAGS_STOPPED) {
1766                 return true;
1767         }
1768
1769         /* try to use the most connected node */
1770         if (cmp == 0) {
1771                 cmp = (int)myem.num_connected - (int)em->num_connected;
1772         }
1773
1774         /* then the longest running node */
1775         if (cmp == 0) {
1776                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1777         }
1778
1779         if (cmp == 0) {
1780                 cmp = (int)myem.pnn - (int)em->pnn;
1781         }
1782
1783         return cmp > 0;
1784 }
1785
1786 /*
1787   send out an election request
1788  */
1789 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1790 {
1791         int ret;
1792         TDB_DATA election_data;
1793         struct election_message emsg;
1794         uint64_t srvid;
1795         struct ctdb_context *ctdb = rec->ctdb;
1796
1797         srvid = CTDB_SRVID_RECOVERY;
1798
1799         ctdb_election_data(rec, &emsg);
1800
1801         election_data.dsize = sizeof(struct election_message);
1802         election_data.dptr  = (unsigned char *)&emsg;
1803
1804
1805         /* send an election message to all active nodes */
1806         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1807         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1808
1809
1810         /* A new node that is already frozen has entered the cluster.
1811            The existing nodes are not frozen and dont need to be frozen
1812            until the election has ended and we start the actual recovery
1813         */
1814         if (update_recmaster == true) {
1815                 /* first we assume we will win the election and set 
1816                    recoverymaster to be ourself on the current node
1817                  */
1818                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1819                 if (ret != 0) {
1820                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1821                         return -1;
1822                 }
1823         }
1824
1825
1826         return 0;
1827 }
1828
1829 /*
1830   this function will unban all nodes in the cluster
1831 */
1832 static void unban_all_nodes(struct ctdb_context *ctdb)
1833 {
1834         int ret, i;
1835         struct ctdb_node_map *nodemap;
1836         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1837         
1838         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1841                 return;
1842         }
1843
1844         for (i=0;i<nodemap->num;i++) {
1845                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1846                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1847                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1848                 }
1849         }
1850
1851         talloc_free(tmp_ctx);
1852 }
1853
1854
1855 /*
1856   we think we are winning the election - send a broadcast election request
1857  */
1858 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1859 {
1860         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1861         int ret;
1862
1863         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1864         if (ret != 0) {
1865                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1866         }
1867
1868         talloc_free(rec->send_election_te);
1869         rec->send_election_te = NULL;
1870 }
1871
1872 /*
1873   handler for memory dumps
1874 */
1875 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1876                              TDB_DATA data, void *private_data)
1877 {
1878         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1879         TDB_DATA *dump;
1880         int ret;
1881         struct rd_memdump_reply *rd;
1882
1883         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1884                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1885                 talloc_free(tmp_ctx);
1886                 return;
1887         }
1888         rd = (struct rd_memdump_reply *)data.dptr;
1889
1890         dump = talloc_zero(tmp_ctx, TDB_DATA);
1891         if (dump == NULL) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1893                 talloc_free(tmp_ctx);
1894                 return;
1895         }
1896         ret = ctdb_dump_memory(ctdb, dump);
1897         if (ret != 0) {
1898                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1899                 talloc_free(tmp_ctx);
1900                 return;
1901         }
1902
1903 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1904
1905         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1906         if (ret != 0) {
1907                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1908                 talloc_free(tmp_ctx);
1909                 return;
1910         }
1911
1912         talloc_free(tmp_ctx);
1913 }
1914
1915 /*
1916   handler for reload_nodes
1917 */
1918 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1919                              TDB_DATA data, void *private_data)
1920 {
1921         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1922
1923         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1924
1925         reload_nodes_file(rec->ctdb);
1926 }
1927
1928
1929 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1930                               struct timeval yt, void *p)
1931 {
1932         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1933
1934         talloc_free(rec->ip_check_disable_ctx);
1935         rec->ip_check_disable_ctx = NULL;
1936 }
1937
1938
1939 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1940                              TDB_DATA data, void *private_data)
1941 {
1942         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1943         struct ctdb_public_ip *ip;
1944
1945         if (rec->recmaster != rec->ctdb->pnn) {
1946                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1947                 return;
1948         }
1949
1950         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1951                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1952                 return;
1953         }
1954
1955         ip = (struct ctdb_public_ip *)data.dptr;
1956
1957         update_ip_assignment_tree(rec->ctdb, ip);
1958 }
1959
1960
1961 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1962                              TDB_DATA data, void *private_data)
1963 {
1964         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1965         uint32_t timeout;
1966
1967         if (rec->ip_check_disable_ctx != NULL) {
1968                 talloc_free(rec->ip_check_disable_ctx);
1969                 rec->ip_check_disable_ctx = NULL;
1970         }
1971
1972         if (data.dsize != sizeof(uint32_t)) {
1973                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1974                                  "expexting %lu\n", (long unsigned)data.dsize,
1975                                  (long unsigned)sizeof(uint32_t)));
1976                 return;
1977         }
1978         if (data.dptr == NULL) {
1979                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1980                 return;
1981         }
1982
1983         timeout = *((uint32_t *)data.dptr);
1984         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1985
1986         rec->ip_check_disable_ctx = talloc_new(rec);
1987         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1988
1989         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1990 }
1991
1992
1993 /*
1994   handler for ip reallocate, just add it to the list of callers and 
1995   handle this later in the monitor_cluster loop so we do not recurse
1996   with other callers to takeover_run()
1997 */
1998 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1999                              TDB_DATA data, void *private_data)
2000 {
2001         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2002         struct ip_reallocate_list *caller;
2003
2004         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2005                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2006                 return;
2007         }
2008
2009         if (rec->ip_reallocate_ctx == NULL) {
2010                 rec->ip_reallocate_ctx = talloc_new(rec);
2011                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2012         }
2013
2014         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2015         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2016
2017         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2018         caller->next = rec->reallocate_callers;
2019         rec->reallocate_callers = caller;
2020
2021         return;
2022 }
2023
2024 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2025 {
2026         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2027         TDB_DATA result;
2028         int32_t ret;
2029         struct ip_reallocate_list *callers;
2030         uint32_t culprit;
2031
2032         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2033
2034         /* update the list of public ips that a node can handle for
2035            all connected nodes
2036         */
2037         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2038         if (ret != 0) {
2039                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2040                                  culprit));
2041                 rec->need_takeover_run = true;
2042         }
2043         if (ret == 0) {
2044                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2045                 if (ret != 0) {
2046                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2047                                          culprit));
2048                         rec->need_takeover_run = true;
2049                 }
2050         }
2051
2052         result.dsize = sizeof(int32_t);
2053         result.dptr  = (uint8_t *)&ret;
2054
2055         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2056
2057                 /* Someone that sent srvid==0 does not want a reply */
2058                 if (callers->rd->srvid == 0) {
2059                         continue;
2060                 }
2061                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2062                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2063                                   (unsigned long long)callers->rd->srvid));
2064                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2065                 if (ret != 0) {
2066                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2067                                          "message to %u:%llu\n",
2068                                          (unsigned)callers->rd->pnn,
2069                                          (unsigned long long)callers->rd->srvid));
2070                 }
2071         }
2072
2073         talloc_free(tmp_ctx);
2074         talloc_free(rec->ip_reallocate_ctx);
2075         rec->ip_reallocate_ctx = NULL;
2076         rec->reallocate_callers = NULL;
2077         
2078 }
2079
2080
2081 /*
2082   handler for recovery master elections
2083 */
2084 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2085                              TDB_DATA data, void *private_data)
2086 {
2087         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2088         int ret;
2089         struct election_message *em = (struct election_message *)data.dptr;
2090         TALLOC_CTX *mem_ctx;
2091
2092         /* we got an election packet - update the timeout for the election */
2093         talloc_free(rec->election_timeout);
2094         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2095                                                 fast_start ?
2096                                                 timeval_current_ofs(0, 500000) :
2097                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2098                                                 ctdb_election_timeout, rec);
2099
2100         mem_ctx = talloc_new(ctdb);
2101
2102         /* someone called an election. check their election data
2103            and if we disagree and we would rather be the elected node, 
2104            send a new election message to all other nodes
2105          */
2106         if (ctdb_election_win(rec, em)) {
2107                 if (!rec->send_election_te) {
2108                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2109                                                                 timeval_current_ofs(0, 500000),
2110                                                                 election_send_request, rec);
2111                 }
2112                 talloc_free(mem_ctx);
2113                 /*unban_all_nodes(ctdb);*/
2114                 return;
2115         }
2116         
2117         /* we didn't win */
2118         talloc_free(rec->send_election_te);
2119         rec->send_election_te = NULL;
2120
2121         if (ctdb->tunable.verify_recovery_lock != 0) {
2122                 /* release the recmaster lock */
2123                 if (em->pnn != ctdb->pnn &&
2124                     ctdb->recovery_lock_fd != -1) {
2125                         close(ctdb->recovery_lock_fd);
2126                         ctdb->recovery_lock_fd = -1;
2127                         unban_all_nodes(ctdb);
2128                 }
2129         }
2130
2131         /* ok, let that guy become recmaster then */
2132         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2135                 talloc_free(mem_ctx);
2136                 return;
2137         }
2138
2139         talloc_free(mem_ctx);
2140         return;
2141 }
2142
2143
2144 /*
2145   force the start of the election process
2146  */
2147 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2148                            struct ctdb_node_map *nodemap)
2149 {
2150         int ret;
2151         struct ctdb_context *ctdb = rec->ctdb;
2152
2153         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2154
2155         /* set all nodes to recovery mode to stop all internode traffic */
2156         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2157         if (ret != 0) {
2158                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2159                 return;
2160         }
2161
2162         talloc_free(rec->election_timeout);
2163         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2164                                                 fast_start ?
2165                                                 timeval_current_ofs(0, 500000) :
2166                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2167                                                 ctdb_election_timeout, rec);
2168
2169         ret = send_election_request(rec, pnn, true);
2170         if (ret!=0) {
2171                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2172                 return;
2173         }
2174
2175         /* wait for a few seconds to collect all responses */
2176         ctdb_wait_election(rec);
2177 }
2178
2179
2180
2181 /*
2182   handler for when a node changes its flags
2183 */
2184 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2185                             TDB_DATA data, void *private_data)
2186 {
2187         int ret;
2188         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2189         struct ctdb_node_map *nodemap=NULL;
2190         TALLOC_CTX *tmp_ctx;
2191         uint32_t changed_flags;
2192         int i;
2193         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2194         int disabled_flag_changed;
2195
2196         if (data.dsize != sizeof(*c)) {
2197                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2198                 return;
2199         }
2200
2201         tmp_ctx = talloc_new(ctdb);
2202         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2203
2204         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2205         if (ret != 0) {
2206                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2207                 talloc_free(tmp_ctx);
2208                 return;         
2209         }
2210
2211
2212         for (i=0;i<nodemap->num;i++) {
2213                 if (nodemap->nodes[i].pnn == c->pnn) break;
2214         }
2215
2216         if (i == nodemap->num) {
2217                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2218                 talloc_free(tmp_ctx);
2219                 return;
2220         }
2221
2222         changed_flags = c->old_flags ^ c->new_flags;
2223
2224         if (nodemap->nodes[i].flags != c->new_flags) {
2225                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2226         }
2227
2228         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2229
2230         nodemap->nodes[i].flags = c->new_flags;
2231
2232         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2233                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2234
2235         if (ret == 0) {
2236                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2237                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2238         }
2239         
2240         if (ret == 0 &&
2241             ctdb->recovery_master == ctdb->pnn &&
2242             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2243                 /* Only do the takeover run if the perm disabled or unhealthy
2244                    flags changed since these will cause an ip failover but not
2245                    a recovery.
2246                    If the node became disconnected or banned this will also
2247                    lead to an ip address failover but that is handled 
2248                    during recovery
2249                 */
2250                 if (disabled_flag_changed) {
2251                         rec->need_takeover_run = true;
2252                 }
2253         }
2254
2255         talloc_free(tmp_ctx);
2256 }
2257
2258 /*
2259   handler for when we need to push out flag changes ot all other nodes
2260 */
2261 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2262                             TDB_DATA data, void *private_data)
2263 {
2264         int ret;
2265         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2266         struct ctdb_node_map *nodemap=NULL;
2267         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2268         uint32_t recmaster;
2269         uint32_t *nodes;
2270
2271         /* find the recovery master */
2272         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2273         if (ret != 0) {
2274                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2275                 talloc_free(tmp_ctx);
2276                 return;
2277         }
2278
2279         /* read the node flags from the recmaster */
2280         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2281         if (ret != 0) {
2282                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2283                 talloc_free(tmp_ctx);
2284                 return;
2285         }
2286         if (c->pnn >= nodemap->num) {
2287                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2288                 talloc_free(tmp_ctx);
2289                 return;
2290         }
2291
2292         /* send the flags update to all connected nodes */
2293         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2294
2295         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2296                                       nodes, 0, CONTROL_TIMEOUT(),
2297                                       false, data,
2298                                       NULL, NULL,
2299                                       NULL) != 0) {
2300                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2301
2302                 talloc_free(tmp_ctx);
2303                 return;
2304         }
2305
2306         talloc_free(tmp_ctx);
2307 }
2308
2309
2310 struct verify_recmode_normal_data {
2311         uint32_t count;
2312         enum monitor_result status;
2313 };
2314
2315 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2316 {
2317         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2318
2319
2320         /* one more node has responded with recmode data*/
2321         rmdata->count--;
2322
2323         /* if we failed to get the recmode, then return an error and let
2324            the main loop try again.
2325         */
2326         if (state->state != CTDB_CONTROL_DONE) {
2327                 if (rmdata->status == MONITOR_OK) {
2328                         rmdata->status = MONITOR_FAILED;
2329                 }
2330                 return;
2331         }
2332
2333         /* if we got a response, then the recmode will be stored in the
2334            status field
2335         */
2336         if (state->status != CTDB_RECOVERY_NORMAL) {
2337                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2338                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2339         }
2340
2341         return;
2342 }
2343
2344
2345 /* verify that all nodes are in normal recovery mode */
2346 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2347 {
2348         struct verify_recmode_normal_data *rmdata;
2349         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2350         struct ctdb_client_control_state *state;
2351         enum monitor_result status;
2352         int j;
2353         
2354         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2355         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2356         rmdata->count  = 0;
2357         rmdata->status = MONITOR_OK;
2358
2359         /* loop over all active nodes and send an async getrecmode call to 
2360            them*/
2361         for (j=0; j<nodemap->num; j++) {
2362                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2363                         continue;
2364                 }
2365                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2366                                         CONTROL_TIMEOUT(), 
2367                                         nodemap->nodes[j].pnn);
2368                 if (state == NULL) {
2369                         /* we failed to send the control, treat this as 
2370                            an error and try again next iteration
2371                         */                      
2372                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2373                         talloc_free(mem_ctx);
2374                         return MONITOR_FAILED;
2375                 }
2376
2377                 /* set up the callback functions */
2378                 state->async.fn = verify_recmode_normal_callback;
2379                 state->async.private_data = rmdata;
2380
2381                 /* one more control to wait for to complete */
2382                 rmdata->count++;
2383         }
2384
2385
2386         /* now wait for up to the maximum number of seconds allowed
2387            or until all nodes we expect a response from has replied
2388         */
2389         while (rmdata->count > 0) {
2390                 event_loop_once(ctdb->ev);
2391         }
2392
2393         status = rmdata->status;
2394         talloc_free(mem_ctx);
2395         return status;
2396 }
2397
2398
2399 struct verify_recmaster_data {
2400         struct ctdb_recoverd *rec;
2401         uint32_t count;
2402         uint32_t pnn;
2403         enum monitor_result status;
2404 };
2405
2406 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2407 {
2408         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2409
2410
2411         /* one more node has responded with recmaster data*/
2412         rmdata->count--;
2413
2414         /* if we failed to get the recmaster, then return an error and let
2415            the main loop try again.
2416         */
2417         if (state->state != CTDB_CONTROL_DONE) {
2418                 if (rmdata->status == MONITOR_OK) {
2419                         rmdata->status = MONITOR_FAILED;
2420                 }
2421                 return;
2422         }
2423
2424         /* if we got a response, then the recmaster will be stored in the
2425            status field
2426         */
2427         if (state->status != rmdata->pnn) {
2428                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2429                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2430                 rmdata->status = MONITOR_ELECTION_NEEDED;
2431         }
2432
2433         return;
2434 }
2435
2436
2437 /* verify that all nodes agree that we are the recmaster */
2438 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2439 {
2440         struct ctdb_context *ctdb = rec->ctdb;
2441         struct verify_recmaster_data *rmdata;
2442         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2443         struct ctdb_client_control_state *state;
2444         enum monitor_result status;
2445         int j;
2446         
2447         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2448         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2449         rmdata->rec    = rec;
2450         rmdata->count  = 0;
2451         rmdata->pnn    = pnn;
2452         rmdata->status = MONITOR_OK;
2453
2454         /* loop over all active nodes and send an async getrecmaster call to 
2455            them*/
2456         for (j=0; j<nodemap->num; j++) {
2457                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2458                         continue;
2459                 }
2460                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2461                                         CONTROL_TIMEOUT(),
2462                                         nodemap->nodes[j].pnn);
2463                 if (state == NULL) {
2464                         /* we failed to send the control, treat this as 
2465                            an error and try again next iteration
2466                         */                      
2467                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2468                         talloc_free(mem_ctx);
2469                         return MONITOR_FAILED;
2470                 }
2471
2472                 /* set up the callback functions */
2473                 state->async.fn = verify_recmaster_callback;
2474                 state->async.private_data = rmdata;
2475
2476                 /* one more control to wait for to complete */
2477                 rmdata->count++;
2478         }
2479
2480
2481         /* now wait for up to the maximum number of seconds allowed
2482            or until all nodes we expect a response from has replied
2483         */
2484         while (rmdata->count > 0) {
2485                 event_loop_once(ctdb->ev);
2486         }
2487
2488         status = rmdata->status;
2489         talloc_free(mem_ctx);
2490         return status;
2491 }
2492
2493
2494 /* called to check that the local allocation of public ip addresses is ok.
2495 */
2496 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2497 {
2498         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2499         struct ctdb_control_get_ifaces *ifaces = NULL;
2500         struct ctdb_all_public_ips *ips = NULL;
2501         struct ctdb_uptime *uptime1 = NULL;
2502         struct ctdb_uptime *uptime2 = NULL;
2503         int ret, j;
2504         bool need_iface_check = false;
2505         bool need_takeover_run = false;
2506
2507         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2508                                 CTDB_CURRENT_NODE, &uptime1);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2511                 talloc_free(mem_ctx);
2512                 return -1;
2513         }
2514
2515
2516         /* read the interfaces from the local node */
2517         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2518         if (ret != 0) {
2519                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2520                 talloc_free(mem_ctx);
2521                 return -1;
2522         }
2523
2524         if (!rec->ifaces) {
2525                 need_iface_check = true;
2526         } else if (rec->ifaces->num != ifaces->num) {
2527                 need_iface_check = true;
2528         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2529                 need_iface_check = true;
2530         }
2531
2532         if (need_iface_check) {
2533                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2534                                      "local node %u - force takeover run\n",
2535                                      pnn));
2536                 need_takeover_run = true;
2537         }
2538
2539         /* read the ip allocation from the local node */
2540         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2541         if (ret != 0) {
2542                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2543                 talloc_free(mem_ctx);
2544                 return -1;
2545         }
2546
2547         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2548                                 CTDB_CURRENT_NODE, &uptime2);
2549         if (ret != 0) {
2550                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2551                 talloc_free(mem_ctx);
2552                 return -1;
2553         }
2554
2555         /* skip the check if the startrecovery time has changed */
2556         if (timeval_compare(&uptime1->last_recovery_started,
2557                             &uptime2->last_recovery_started) != 0) {
2558                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2559                 talloc_free(mem_ctx);
2560                 return 0;
2561         }
2562
2563         /* skip the check if the endrecovery time has changed */
2564         if (timeval_compare(&uptime1->last_recovery_finished,
2565                             &uptime2->last_recovery_finished) != 0) {
2566                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2567                 talloc_free(mem_ctx);
2568                 return 0;
2569         }
2570
2571         /* skip the check if we have started but not finished recovery */
2572         if (timeval_compare(&uptime1->last_recovery_finished,
2573                             &uptime1->last_recovery_started) != 1) {
2574                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2575                 talloc_free(mem_ctx);
2576
2577                 return 0;
2578         }
2579
2580         talloc_free(rec->ifaces);
2581         rec->ifaces = talloc_steal(rec, ifaces);
2582
2583         /* verify that we have the ip addresses we should have
2584            and we dont have ones we shouldnt have.
2585            if we find an inconsistency we set recmode to
2586            active on the local node and wait for the recmaster
2587            to do a full blown recovery
2588         */
2589         for (j=0; j<ips->num; j++) {
2590                 if (ips->ips[j].pnn == pnn) {
2591                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2592                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2593                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2594                                 need_takeover_run = true;
2595                         }
2596                 } else {
2597                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2598                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2599                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2600                                 need_takeover_run = true;
2601                         }
2602                 }
2603         }
2604
2605         if (need_takeover_run) {
2606                 struct takeover_run_reply rd;
2607                 TDB_DATA data;
2608
2609                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2610
2611                 rd.pnn = ctdb->pnn;
2612                 rd.srvid = 0;
2613                 data.dptr = (uint8_t *)&rd;
2614                 data.dsize = sizeof(rd);
2615
2616                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2617                 if (ret != 0) {
2618                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2619                 }
2620         }
2621         talloc_free(mem_ctx);
2622         return 0;
2623 }
2624
2625
2626 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2627 {
2628         struct ctdb_node_map **remote_nodemaps = callback_data;
2629
2630         if (node_pnn >= ctdb->num_nodes) {
2631                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2632                 return;
2633         }
2634
2635         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2636
2637 }
2638
2639 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2640         struct ctdb_node_map *nodemap,
2641         struct ctdb_node_map **remote_nodemaps)
2642 {
2643         uint32_t *nodes;
2644
2645         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2646         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2647                                         nodes, 0,
2648                                         CONTROL_TIMEOUT(), false, tdb_null,
2649                                         async_getnodemap_callback,
2650                                         NULL,
2651                                         remote_nodemaps) != 0) {
2652                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2653
2654                 return -1;
2655         }
2656
2657         return 0;
2658 }
2659
2660 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2661 struct ctdb_check_reclock_state {
2662         struct ctdb_context *ctdb;
2663         struct timeval start_time;
2664         int fd[2];
2665         pid_t child;
2666         struct timed_event *te;
2667         struct fd_event *fde;
2668         enum reclock_child_status status;
2669 };
2670
2671 /* when we free the reclock state we must kill any child process.
2672 */
2673 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2674 {
2675         struct ctdb_context *ctdb = state->ctdb;
2676
2677         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2678
2679         if (state->fd[0] != -1) {
2680                 close(state->fd[0]);
2681                 state->fd[0] = -1;
2682         }
2683         if (state->fd[1] != -1) {
2684                 close(state->fd[1]);
2685                 state->fd[1] = -1;
2686         }
2687         kill(state->child, SIGKILL);
2688         return 0;
2689 }
2690
2691 /*
2692   called if our check_reclock child times out. this would happen if
2693   i/o to the reclock file blocks.
2694  */
2695 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2696                                          struct timeval t, void *private_data)
2697 {
2698         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2699                                            struct ctdb_check_reclock_state);
2700
2701         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2702         state->status = RECLOCK_TIMEOUT;
2703 }
2704
2705 /* this is called when the child process has completed checking the reclock
2706    file and has written data back to us through the pipe.
2707 */
2708 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2709                              uint16_t flags, void *private_data)
2710 {
2711         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2712                                              struct ctdb_check_reclock_state);
2713         char c = 0;
2714         int ret;
2715
2716         /* we got a response from our child process so we can abort the
2717            timeout.
2718         */
2719         talloc_free(state->te);
2720         state->te = NULL;
2721
2722         ret = read(state->fd[0], &c, 1);
2723         if (ret != 1 || c != RECLOCK_OK) {
2724                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2725                 state->status = RECLOCK_FAILED;
2726
2727                 return;
2728         }
2729
2730         state->status = RECLOCK_OK;
2731         return;
2732 }
2733
2734 static int check_recovery_lock(struct ctdb_context *ctdb)
2735 {
2736         int ret;
2737         struct ctdb_check_reclock_state *state;
2738         pid_t parent = getpid();
2739
2740         if (ctdb->recovery_lock_fd == -1) {
2741                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2742                 return -1;
2743         }
2744
2745         state = talloc(ctdb, struct ctdb_check_reclock_state);
2746         CTDB_NO_MEMORY(ctdb, state);
2747
2748         state->ctdb = ctdb;
2749         state->start_time = timeval_current();
2750         state->status = RECLOCK_CHECKING;
2751         state->fd[0] = -1;
2752         state->fd[1] = -1;
2753
2754         ret = pipe(state->fd);
2755         if (ret != 0) {
2756                 talloc_free(state);
2757                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2758                 return -1;
2759         }
2760
2761         state->child = fork();
2762         if (state->child == (pid_t)-1) {
2763                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2764                 close(state->fd[0]);
2765                 state->fd[0] = -1;
2766                 close(state->fd[1]);
2767                 state->fd[1] = -1;
2768                 talloc_free(state);
2769                 return -1;
2770         }
2771
2772         if (state->child == 0) {
2773                 char cc = RECLOCK_OK;
2774                 close(state->fd[0]);
2775                 state->fd[0] = -1;
2776
2777                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2778                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2779                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2780                         cc = RECLOCK_FAILED;
2781                 }
2782
2783                 write(state->fd[1], &cc, 1);
2784                 /* make sure we die when our parent dies */
2785                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2786                         sleep(5);
2787                         write(state->fd[1], &cc, 1);
2788                 }
2789                 _exit(0);
2790         }
2791         close(state->fd[1]);
2792         state->fd[1] = -1;
2793         set_close_on_exec(state->fd[0]);
2794
2795         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2796
2797         talloc_set_destructor(state, check_reclock_destructor);
2798
2799         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2800                                     ctdb_check_reclock_timeout, state);
2801         if (state->te == NULL) {
2802                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2803                 talloc_free(state);
2804                 return -1;
2805         }
2806
2807         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2808                                 EVENT_FD_READ,
2809                                 reclock_child_handler,
2810                                 (void *)state);
2811
2812         if (state->fde == NULL) {
2813                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2814                 talloc_free(state);
2815                 return -1;
2816         }
2817         tevent_fd_set_auto_close(state->fde);
2818
2819         while (state->status == RECLOCK_CHECKING) {
2820                 event_loop_once(ctdb->ev);
2821         }
2822
2823         if (state->status == RECLOCK_FAILED) {
2824                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2825                 close(ctdb->recovery_lock_fd);
2826                 ctdb->recovery_lock_fd = -1;
2827                 talloc_free(state);
2828                 return -1;
2829         }
2830
2831         talloc_free(state);
2832         return 0;
2833 }
2834
2835 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2836 {
2837         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2838         const char *reclockfile;
2839
2840         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2841                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2842                 talloc_free(tmp_ctx);
2843                 return -1;      
2844         }
2845
2846         if (reclockfile == NULL) {
2847                 if (ctdb->recovery_lock_file != NULL) {
2848                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2849                         talloc_free(ctdb->recovery_lock_file);
2850                         ctdb->recovery_lock_file = NULL;
2851                         if (ctdb->recovery_lock_fd != -1) {
2852                                 close(ctdb->recovery_lock_fd);
2853                                 ctdb->recovery_lock_fd = -1;
2854                         }
2855                 }
2856                 ctdb->tunable.verify_recovery_lock = 0;
2857                 talloc_free(tmp_ctx);
2858                 return 0;
2859         }
2860
2861         if (ctdb->recovery_lock_file == NULL) {
2862                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2863                 if (ctdb->recovery_lock_fd != -1) {
2864                         close(ctdb->recovery_lock_fd);
2865                         ctdb->recovery_lock_fd = -1;
2866                 }
2867                 talloc_free(tmp_ctx);
2868                 return 0;
2869         }
2870
2871
2872         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2873                 talloc_free(tmp_ctx);
2874                 return 0;
2875         }
2876
2877         talloc_free(ctdb->recovery_lock_file);
2878         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2879         ctdb->tunable.verify_recovery_lock = 0;
2880         if (ctdb->recovery_lock_fd != -1) {
2881                 close(ctdb->recovery_lock_fd);
2882                 ctdb->recovery_lock_fd = -1;
2883         }
2884
2885         talloc_free(tmp_ctx);
2886         return 0;
2887 }
2888
2889 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2890                       TALLOC_CTX *mem_ctx)
2891 {
2892         uint32_t pnn;
2893         struct ctdb_node_map *nodemap=NULL;
2894         struct ctdb_node_map *recmaster_nodemap=NULL;
2895         struct ctdb_node_map **remote_nodemaps=NULL;
2896         struct ctdb_vnn_map *vnnmap=NULL;
2897         struct ctdb_vnn_map *remote_vnnmap=NULL;
2898         int32_t debug_level;
2899         int i, j, ret;
2900
2901
2902
2903         /* verify that the main daemon is still running */
2904         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2905                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2906                 exit(-1);
2907         }
2908
2909         /* ping the local daemon to tell it we are alive */
2910         ctdb_ctrl_recd_ping(ctdb);
2911
2912         if (rec->election_timeout) {
2913                 /* an election is in progress */
2914                 return;
2915         }
2916
2917         /* read the debug level from the parent and update locally */
2918         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2919         if (ret !=0) {
2920                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2921                 return;
2922         }
2923         LogLevel = debug_level;
2924
2925
2926         /* We must check if we need to ban a node here but we want to do this
2927            as early as possible so we dont wait until we have pulled the node
2928            map from the local node. thats why we have the hardcoded value 20
2929         */
2930         for (i=0; i<ctdb->num_nodes; i++) {
2931                 struct ctdb_banning_state *ban_state;
2932
2933                 if (ctdb->nodes[i]->ban_state == NULL) {
2934                         continue;
2935                 }
2936                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2937                 if (ban_state->count < 20) {
2938                         continue;
2939                 }
2940                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2941                         ctdb->nodes[i]->pnn, ban_state->count,
2942                         ctdb->tunable.recovery_ban_period));
2943                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2944                 ban_state->count = 0;
2945         }
2946
2947         /* get relevant tunables */
2948         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2949         if (ret != 0) {
2950                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2951                 return;
2952         }
2953
2954         /* get the current recovery lock file from the server */
2955         if (update_recovery_lock_file(ctdb) != 0) {
2956                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2957                 return;
2958         }
2959
2960         /* Make sure that if recovery lock verification becomes disabled when
2961            we close the file
2962         */
2963         if (ctdb->tunable.verify_recovery_lock == 0) {
2964                 if (ctdb->recovery_lock_fd != -1) {
2965                         close(ctdb->recovery_lock_fd);
2966                         ctdb->recovery_lock_fd = -1;
2967                 }
2968         }
2969
2970         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2971         if (pnn == (uint32_t)-1) {
2972                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2973                 return;
2974         }
2975
2976         /* get the vnnmap */
2977         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2978         if (ret != 0) {
2979                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2980                 return;
2981         }
2982
2983
2984         /* get number of nodes */
2985         if (rec->nodemap) {
2986                 talloc_free(rec->nodemap);
2987                 rec->nodemap = NULL;
2988                 nodemap=NULL;
2989         }
2990         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2991         if (ret != 0) {
2992                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2993                 return;
2994         }
2995         nodemap = rec->nodemap;
2996
2997         /* check which node is the recovery master */
2998         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2999         if (ret != 0) {
3000                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3001                 return;
3002         }
3003
3004         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3005         if (rec->recmaster != pnn) {
3006                 if (rec->ip_reallocate_ctx != NULL) {
3007                         talloc_free(rec->ip_reallocate_ctx);
3008                         rec->ip_reallocate_ctx = NULL;
3009                         rec->reallocate_callers = NULL;
3010                 }
3011         }
3012
3013         if (rec->recmaster == (uint32_t)-1) {
3014                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3015                 force_election(rec, pnn, nodemap);
3016                 return;
3017         }
3018
3019
3020         /* if the local daemon is STOPPED, we verify that the databases are
3021            also frozen and thet the recmode is set to active 
3022         */
3023         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3024                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3025                 if (ret != 0) {
3026                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3027                 }
3028                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3029                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3030
3031                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3032                         if (ret != 0) {
3033                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3034                                 return;
3035                         }
3036                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3037                         if (ret != 0) {
3038                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3039
3040                                 return;
3041                         }
3042                         return;
3043                 }
3044         }
3045         /* If the local node is stopped, verify we are not the recmaster 
3046            and yield this role if so
3047         */
3048         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3049                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3050                 force_election(rec, pnn, nodemap);
3051                 return;
3052         }
3053         
3054         /* check that we (recovery daemon) and the local ctdb daemon
3055            agrees on whether we are banned or not
3056         */
3057 //qqq
3058
3059         /* remember our own node flags */
3060         rec->node_flags = nodemap->nodes[pnn].flags;
3061
3062         /* count how many active nodes there are */
3063         rec->num_active    = 0;
3064         rec->num_connected = 0;
3065         for (i=0; i<nodemap->num; i++) {
3066                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3067                         rec->num_active++;
3068                 }
3069                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3070                         rec->num_connected++;
3071                 }
3072         }
3073
3074
3075         /* verify that the recmaster node is still active */
3076         for (j=0; j<nodemap->num; j++) {
3077                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3078                         break;
3079                 }
3080         }
3081
3082         if (j == nodemap->num) {
3083                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3084                 force_election(rec, pnn, nodemap);
3085                 return;
3086         }
3087
3088         /* if recovery master is disconnected we must elect a new recmaster */
3089         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3090                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3091                 force_election(rec, pnn, nodemap);
3092                 return;
3093         }
3094
3095         /* grap the nodemap from the recovery master to check if it is banned */
3096         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3097                                    mem_ctx, &recmaster_nodemap);
3098         if (ret != 0) {
3099                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3100                           nodemap->nodes[j].pnn));
3101                 return;
3102         }
3103
3104
3105         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3106                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3107                 force_election(rec, pnn, nodemap);
3108                 return;
3109         }
3110
3111
3112         /* verify that we have all ip addresses we should have and we dont
3113          * have addresses we shouldnt have.
3114          */ 
3115         if (ctdb->do_checkpublicip) {
3116                 if (rec->ip_check_disable_ctx == NULL) {
3117                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3118                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3119                         }
3120                 }
3121         }
3122
3123
3124         /* if we are not the recmaster then we do not need to check
3125            if recovery is needed
3126          */
3127         if (pnn != rec->recmaster) {
3128                 return;
3129         }
3130
3131
3132         /* ensure our local copies of flags are right */
3133         ret = update_local_flags(rec, nodemap);
3134         if (ret == MONITOR_ELECTION_NEEDED) {
3135                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3136                 force_election(rec, pnn, nodemap);
3137                 return;
3138         }
3139         if (ret != MONITOR_OK) {
3140                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3141                 return;
3142         }
3143
3144         if (ctdb->num_nodes != nodemap->num) {
3145                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3146                 reload_nodes_file(ctdb);
3147                 return;
3148         }
3149
3150         /* verify that all active nodes agree that we are the recmaster */
3151         switch (verify_recmaster(rec, nodemap, pnn)) {
3152         case MONITOR_RECOVERY_NEEDED:
3153                 /* can not happen */
3154                 return;
3155         case MONITOR_ELECTION_NEEDED:
3156                 force_election(rec, pnn, nodemap);
3157                 return;
3158         case MONITOR_OK:
3159                 break;
3160         case MONITOR_FAILED:
3161                 return;
3162         }
3163
3164
3165         if (rec->need_recovery) {
3166                 /* a previous recovery didn't finish */
3167                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3168                 return;
3169         }
3170
3171         /* verify that all active nodes are in normal mode 
3172            and not in recovery mode 
3173         */
3174         switch (verify_recmode(ctdb, nodemap)) {
3175         case MONITOR_RECOVERY_NEEDED:
3176                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3177                 return;
3178         case MONITOR_FAILED:
3179                 return;
3180         case MONITOR_ELECTION_NEEDED:
3181                 /* can not happen */
3182         case MONITOR_OK:
3183                 break;
3184         }
3185
3186
3187         if (ctdb->tunable.verify_recovery_lock != 0) {
3188                 /* we should have the reclock - check its not stale */
3189                 ret = check_recovery_lock(ctdb);
3190                 if (ret != 0) {
3191                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3192                         ctdb_set_culprit(rec, ctdb->pnn);
3193                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3194                         return;
3195                 }
3196         }
3197
3198         /* if there are takeovers requested, perform it and notify the waiters */
3199         if (rec->reallocate_callers) {
3200                 process_ipreallocate_requests(ctdb, rec);
3201         }
3202
3203         /* get the nodemap for all active remote nodes
3204          */
3205         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3206         if (remote_nodemaps == NULL) {
3207                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3208                 return;
3209         }
3210         for(i=0; i<nodemap->num; i++) {
3211                 remote_nodemaps[i] = NULL;
3212         }
3213         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3214                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3215                 return;
3216         } 
3217
3218         /* verify that all other nodes have the same nodemap as we have
3219         */
3220         for (j=0; j<nodemap->num; j++) {
3221                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3222                         continue;
3223                 }
3224
3225                 if (remote_nodemaps[j] == NULL) {
3226                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3227                         ctdb_set_culprit(rec, j);
3228
3229                         return;
3230                 }
3231
3232                 /* if the nodes disagree on how many nodes there are
3233                    then this is a good reason to try recovery
3234                  */
3235                 if (remote_nodemaps[j]->num != nodemap->num) {
3236                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3237                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3238                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3239                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3240                         return;
3241                 }
3242
3243                 /* if the nodes disagree on which nodes exist and are
3244                    active, then that is also a good reason to do recovery
3245                  */
3246                 for (i=0;i<nodemap->num;i++) {
3247                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3248                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3249                                           nodemap->nodes[j].pnn, i, 
3250                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3251                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3252                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3253                                             vnnmap);
3254                                 return;
3255                         }
3256                 }
3257
3258                 /* verify the flags are consistent
3259                 */
3260                 for (i=0; i<nodemap->num; i++) {
3261                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3262                                 continue;
3263                         }
3264                         
3265                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3266                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3267                                   nodemap->nodes[j].pnn, 
3268                                   nodemap->nodes[i].pnn, 
3269                                   remote_nodemaps[j]->nodes[i].flags,
3270                                   nodemap->nodes[j].flags));
3271                                 if (i == j) {
3272                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3273                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3274                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3275                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3276                                                     vnnmap);
3277                                         return;
3278                                 } else {
3279                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3280                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3281                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3282                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3283                                                     vnnmap);
3284                                         return;
3285                                 }
3286                         }
3287                 }
3288         }
3289
3290
3291         /* there better be the same number of lmasters in the vnn map
3292            as there are active nodes or we will have to do a recovery
3293          */
3294         if (vnnmap->size != rec->num_active) {
3295                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3296                           vnnmap->size, rec->num_active));
3297                 ctdb_set_culprit(rec, ctdb->pnn);
3298                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3299                 return;
3300         }
3301
3302         /* verify that all active nodes in the nodemap also exist in 
3303            the vnnmap.
3304          */
3305         for (j=0; j<nodemap->num; j++) {
3306                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3307                         continue;
3308                 }
3309                 if (nodemap->nodes[j].pnn == pnn) {
3310                         continue;
3311                 }
3312
3313                 for (i=0; i<vnnmap->size; i++) {
3314                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3315                                 break;
3316                         }
3317                 }
3318                 if (i == vnnmap->size) {
3319                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3320                                   nodemap->nodes[j].pnn));
3321                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3322                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3323                         return;
3324                 }
3325         }
3326
3327         
3328         /* verify that all other nodes have the same vnnmap
3329            and are from the same generation
3330          */
3331         for (j=0; j<nodemap->num; j++) {
3332                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3333                         continue;
3334                 }
3335                 if (nodemap->nodes[j].pnn == pnn) {
3336                         continue;
3337                 }
3338
3339                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3340                                           mem_ctx, &remote_vnnmap);
3341                 if (ret != 0) {
3342                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3343                                   nodemap->nodes[j].pnn));
3344                         return;
3345                 }
3346
3347                 /* verify the vnnmap generation is the same */
3348                 if (vnnmap->generation != remote_vnnmap->generation) {
3349                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3350                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3351                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3352                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3353                         return;
3354                 }
3355
3356                 /* verify the vnnmap size is the same */
3357                 if (vnnmap->size != remote_vnnmap->size) {
3358                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3359                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3360                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3361                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3362                         return;
3363                 }
3364
3365                 /* verify the vnnmap is the same */
3366                 for (i=0;i<vnnmap->size;i++) {
3367                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3368                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3369                                           nodemap->nodes[j].pnn));
3370                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3371                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3372                                             vnnmap);
3373                                 return;
3374                         }
3375                 }
3376         }
3377
3378         /* we might need to change who has what IP assigned */
3379         if (rec->need_takeover_run) {
3380                 uint32_t culprit = (uint32_t)-1;
3381
3382                 rec->need_takeover_run = false;
3383
3384                 /* update the list of public ips that a node can handle for
3385                    all connected nodes
3386                 */
3387                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3388                 if (ret != 0) {
3389                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3390                                          culprit));
3391                         ctdb_set_culprit(rec, culprit);
3392                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3393                         return;
3394                 }
3395
3396                 /* execute the "startrecovery" event script on all nodes */
3397                 ret = run_startrecovery_eventscript(rec, nodemap);
3398                 if (ret!=0) {
3399                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3400                         ctdb_set_culprit(rec, ctdb->pnn);
3401                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3402                         return;
3403                 }
3404
3405                 ret = ctdb_takeover_run(ctdb, nodemap);
3406                 if (ret != 0) {
3407                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3408                         ctdb_set_culprit(rec, ctdb->pnn);
3409                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3410                         return;
3411                 }
3412
3413                 /* execute the "recovered" event script on all nodes */
3414                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3415 #if 0
3416 // we cant check whether the event completed successfully
3417 // since this script WILL fail if the node is in recovery mode
3418 // and if that race happens, the code here would just cause a second
3419 // cascading recovery.
3420                 if (ret!=0) {
3421                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3422                         ctdb_set_culprit(rec, ctdb->pnn);
3423                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3424                 }
3425 #endif
3426         }
3427 }
3428
3429 /*
3430   the main monitoring loop
3431  */
3432 static void monitor_cluster(struct ctdb_context *ctdb)
3433 {
3434         struct ctdb_recoverd *rec;
3435
3436         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3437
3438         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3439         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3440
3441         rec->ctdb = ctdb;
3442
3443         rec->priority_time = timeval_current();
3444
3445         /* register a message port for sending memory dumps */
3446         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3447
3448         /* register a message port for recovery elections */
3449         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3450
3451         /* when nodes are disabled/enabled */
3452         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3453
3454         /* when we are asked to puch out a flag change */
3455         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3456
3457         /* register a message port for vacuum fetch */
3458         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3459
3460         /* register a message port for reloadnodes  */
3461         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3462
3463         /* register a message port for performing a takeover run */
3464         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3465
3466         /* register a message port for disabling the ip check for a short while */
3467         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3468
3469         /* register a message port for updating the recovery daemons node assignment for an ip */
3470         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3471
3472         for (;;) {
3473                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3474                 struct timeval start;
3475                 double elapsed;
3476
3477                 if (!mem_ctx) {
3478                         DEBUG(DEBUG_CRIT,(__location__
3479                                           " Failed to create temp context\n"));
3480                         exit(-1);
3481                 }
3482
3483                 start = timeval_current();
3484                 main_loop(ctdb, rec, mem_ctx);
3485                 talloc_free(mem_ctx);
3486
3487                 /* we only check for recovery once every second */
3488                 elapsed = timeval_elapsed(&start);
3489                 if (elapsed < ctdb->tunable.recover_interval) {
3490                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3491                                           - elapsed);
3492                 }
3493         }
3494 }
3495
3496 /*
3497   event handler for when the main ctdbd dies
3498  */
3499 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3500                                  uint16_t flags, void *private_data)
3501 {
3502         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3503         _exit(1);
3504 }
3505
3506 /*
3507   called regularly to verify that the recovery daemon is still running
3508  */
3509 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3510                               struct timeval yt, void *p)
3511 {
3512         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3513
3514         if (kill(ctdb->recoverd_pid, 0) != 0) {
3515                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3516
3517                 ctdb_stop_recoverd(ctdb);
3518                 ctdb_stop_keepalive(ctdb);
3519                 ctdb_stop_monitoring(ctdb);
3520                 ctdb_release_all_ips(ctdb);
3521                 if (ctdb->methods != NULL) {
3522                         ctdb->methods->shutdown(ctdb);
3523                 }
3524                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3525
3526                 exit(10);       
3527         }
3528
3529         event_add_timed(ctdb->ev, ctdb, 
3530                         timeval_current_ofs(30, 0),
3531                         ctdb_check_recd, ctdb);
3532 }
3533
3534 static void recd_sig_child_handler(struct event_context *ev,
3535         struct signal_event *se, int signum, int count,
3536         void *dont_care, 
3537         void *private_data)
3538 {
3539 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3540         int status;
3541         pid_t pid = -1;
3542
3543         while (pid != 0) {
3544                 pid = waitpid(-1, &status, WNOHANG);
3545                 if (pid == -1) {
3546                         if (errno != ECHILD) {
3547                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3548                         }
3549                         return;
3550                 }
3551                 if (pid > 0) {
3552                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3553                 }
3554         }
3555 }
3556
3557 /*
3558   startup the recovery daemon as a child of the main ctdb daemon
3559  */
3560 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3561 {
3562         int fd[2];
3563         struct signal_event *se;
3564         struct tevent_fd *fde;
3565
3566         if (pipe(fd) != 0) {
3567                 return -1;
3568         }
3569
3570         ctdb->ctdbd_pid = getpid();
3571
3572         ctdb->recoverd_pid = fork();
3573         if (ctdb->recoverd_pid == -1) {
3574                 return -1;
3575         }
3576         
3577         if (ctdb->recoverd_pid != 0) {
3578                 close(fd[0]);
3579                 event_add_timed(ctdb->ev, ctdb, 
3580                                 timeval_current_ofs(30, 0),
3581                                 ctdb_check_recd, ctdb);
3582                 return 0;
3583         }
3584
3585         close(fd[1]);
3586
3587         srandom(getpid() ^ time(NULL));
3588
3589         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3590                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3591                 exit(1);
3592         }
3593
3594         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3595
3596         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3597                      ctdb_recoverd_parent, &fd[0]);     
3598         tevent_fd_set_auto_close(fde);
3599
3600         /* set up a handler to pick up sigchld */
3601         se = event_add_signal(ctdb->ev, ctdb,
3602                                      SIGCHLD, 0,
3603                                      recd_sig_child_handler,
3604                                      ctdb);
3605         if (se == NULL) {
3606                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3607                 exit(1);
3608         }
3609
3610         monitor_cluster(ctdb);
3611
3612         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3613         return -1;
3614 }
3615
3616 /*
3617   shutdown the recovery daemon
3618  */
3619 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3620 {
3621         if (ctdb->recoverd_pid == 0) {
3622                 return;
3623         }
3624
3625         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3626         kill(ctdb->recoverd_pid, SIGTERM);
3627 }