Merge branch 'master' of git://git.samba.org/sahlberg/ctdb
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
891 {
892         uint32_t timed_out = 0;
893         time_t usecs = (secs - (time_t)secs) * 1000000;
894         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
895         while (!timed_out) {
896                 event_loop_once(ctdb->ev);
897         }
898 }
899
900 /*
901   called when an election times out (ends)
902  */
903 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
904                                   struct timeval t, void *p)
905 {
906         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
907         rec->election_timeout = NULL;
908         fast_start = false;
909
910         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
911 }
912
913
914 /*
915   wait for an election to finish. It finished election_timeout seconds after
916   the last election packet is received
917  */
918 static void ctdb_wait_election(struct ctdb_recoverd *rec)
919 {
920         struct ctdb_context *ctdb = rec->ctdb;
921         while (rec->election_timeout) {
922                 event_loop_once(ctdb->ev);
923         }
924 }
925
926 /*
927   Update our local flags from all remote connected nodes. 
928   This is only run when we are or we belive we are the recovery master
929  */
930 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
931 {
932         int j;
933         struct ctdb_context *ctdb = rec->ctdb;
934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
935
936         /* get the nodemap for all active remote nodes and verify
937            they are the same as for this node
938          */
939         for (j=0; j<nodemap->num; j++) {
940                 struct ctdb_node_map *remote_nodemap=NULL;
941                 int ret;
942
943                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
944                         continue;
945                 }
946                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
947                         continue;
948                 }
949
950                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
951                                            mem_ctx, &remote_nodemap);
952                 if (ret != 0) {
953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
954                                   nodemap->nodes[j].pnn));
955                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
956                         talloc_free(mem_ctx);
957                         return MONITOR_FAILED;
958                 }
959                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
967                         if (ret != 0) {
968                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
969                                 return -1;
970                         }
971
972                         /* Update our local copy of the flags in the recovery
973                            daemon.
974                         */
975                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
976                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
977                                  nodemap->nodes[j].flags));
978                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
979                 }
980                 talloc_free(remote_nodemap);
981         }
982         talloc_free(mem_ctx);
983         return MONITOR_OK;
984 }
985
986
987 /* Create a new random generation ip. 
988    The generation id can not be the INVALID_GENERATION id
989 */
990 static uint32_t new_generation(void)
991 {
992         uint32_t generation;
993
994         while (1) {
995                 generation = random();
996
997                 if (generation != INVALID_GENERATION) {
998                         break;
999                 }
1000         }
1001
1002         return generation;
1003 }
1004
1005
1006 /*
1007   create a temporary working database
1008  */
1009 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1010 {
1011         char *name;
1012         struct tdb_wrap *recdb;
1013         unsigned tdb_flags;
1014
1015         /* open up the temporary recovery database */
1016         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1017                                ctdb->db_directory_state,
1018                                ctdb->pnn);
1019         if (name == NULL) {
1020                 return NULL;
1021         }
1022         unlink(name);
1023
1024         tdb_flags = TDB_NOLOCK;
1025         if (ctdb->valgrinding) {
1026                 tdb_flags |= TDB_NOMMAP;
1027         }
1028         tdb_flags |= TDB_DISALLOW_NESTING;
1029
1030         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1031                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1032         if (recdb == NULL) {
1033                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1034         }
1035
1036         talloc_free(name);
1037
1038         return recdb;
1039 }
1040
1041
1042 /* 
1043    a traverse function for pulling all relevent records from recdb
1044  */
1045 struct recdb_data {
1046         struct ctdb_context *ctdb;
1047         struct ctdb_marshall_buffer *recdata;
1048         uint32_t len;
1049         bool failed;
1050         bool persistent;
1051 };
1052
1053 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1054 {
1055         struct recdb_data *params = (struct recdb_data *)p;
1056         struct ctdb_rec_data *rec;
1057         struct ctdb_ltdb_header *hdr;
1058
1059         /* skip empty records */
1060         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1061                 return 0;
1062         }
1063
1064         /* update the dmaster field to point to us */
1065         hdr = (struct ctdb_ltdb_header *)data.dptr;
1066         if (!params->persistent) {
1067                 hdr->dmaster = params->ctdb->pnn;
1068         }
1069
1070         /* add the record to the blob ready to send to the nodes */
1071         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1072         if (rec == NULL) {
1073                 params->failed = true;
1074                 return -1;
1075         }
1076         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1077         if (params->recdata == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1079                          rec->length + params->len, params->recdata->count));
1080                 params->failed = true;
1081                 return -1;
1082         }
1083         params->recdata->count++;
1084         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1085         params->len += rec->length;
1086         talloc_free(rec);
1087
1088         return 0;
1089 }
1090
1091 /*
1092   push the recdb database out to all nodes
1093  */
1094 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1095                                bool persistent,
1096                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1097 {
1098         struct recdb_data params;
1099         struct ctdb_marshall_buffer *recdata;
1100         TDB_DATA outdata;
1101         TALLOC_CTX *tmp_ctx;
1102         uint32_t *nodes;
1103
1104         tmp_ctx = talloc_new(ctdb);
1105         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1106
1107         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1108         CTDB_NO_MEMORY(ctdb, recdata);
1109
1110         recdata->db_id = dbid;
1111
1112         params.ctdb = ctdb;
1113         params.recdata = recdata;
1114         params.len = offsetof(struct ctdb_marshall_buffer, data);
1115         params.failed = false;
1116         params.persistent = persistent;
1117
1118         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1120                 talloc_free(params.recdata);
1121                 talloc_free(tmp_ctx);
1122                 return -1;
1123         }
1124
1125         if (params.failed) {
1126                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1127                 talloc_free(params.recdata);
1128                 talloc_free(tmp_ctx);
1129                 return -1;              
1130         }
1131
1132         recdata = params.recdata;
1133
1134         outdata.dptr = (void *)recdata;
1135         outdata.dsize = params.len;
1136
1137         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                                         nodes, 0,
1140                                         CONTROL_TIMEOUT(), false, outdata,
1141                                         NULL, NULL,
1142                                         NULL) != 0) {
1143                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1144                 talloc_free(recdata);
1145                 talloc_free(tmp_ctx);
1146                 return -1;
1147         }
1148
1149         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1150                   dbid, recdata->count));
1151
1152         talloc_free(recdata);
1153         talloc_free(tmp_ctx);
1154
1155         return 0;
1156 }
1157
1158
1159 /*
1160   go through a full recovery on one database 
1161  */
1162 static int recover_database(struct ctdb_recoverd *rec, 
1163                             TALLOC_CTX *mem_ctx,
1164                             uint32_t dbid,
1165                             bool persistent,
1166                             uint32_t pnn, 
1167                             struct ctdb_node_map *nodemap,
1168                             uint32_t transaction_id)
1169 {
1170         struct tdb_wrap *recdb;
1171         int ret;
1172         struct ctdb_context *ctdb = rec->ctdb;
1173         TDB_DATA data;
1174         struct ctdb_control_wipe_database w;
1175         uint32_t *nodes;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1200                                         nodes, 0,
1201                                         CONTROL_TIMEOUT(), false, data,
1202                                         NULL, NULL,
1203                                         NULL) != 0) {
1204                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1205                 talloc_free(recdb);
1206                 return -1;
1207         }
1208         
1209         /* push out the correct database. This sets the dmaster and skips 
1210            the empty records */
1211         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1212         if (ret != 0) {
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216
1217         /* all done with this database */
1218         talloc_free(recdb);
1219
1220         return 0;
1221 }
1222
1223 /*
1224   reload the nodes file 
1225 */
1226 static void reload_nodes_file(struct ctdb_context *ctdb)
1227 {
1228         ctdb->nodes = NULL;
1229         ctdb_load_nodes_file(ctdb);
1230 }
1231
1232 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1233                                          struct ctdb_recoverd *rec,
1234                                          struct ctdb_node_map *nodemap,
1235                                          uint32_t *culprit)
1236 {
1237         int j;
1238         int ret;
1239
1240         if (ctdb->num_nodes != nodemap->num) {
1241                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1242                                   ctdb->num_nodes, nodemap->num));
1243                 if (culprit) {
1244                         *culprit = ctdb->pnn;
1245                 }
1246                 return -1;
1247         }
1248
1249         for (j=0; j<nodemap->num; j++) {
1250                 /* release any existing data */
1251                 if (ctdb->nodes[j]->known_public_ips) {
1252                         talloc_free(ctdb->nodes[j]->known_public_ips);
1253                         ctdb->nodes[j]->known_public_ips = NULL;
1254                 }
1255                 if (ctdb->nodes[j]->available_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->available_public_ips);
1257                         ctdb->nodes[j]->available_public_ips = NULL;
1258                 }
1259
1260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1261                         continue;
1262                 }
1263
1264                 /* grab a new shiny list of public ips from the node */
1265                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1266                                         CONTROL_TIMEOUT(),
1267                                         ctdb->nodes[j]->pnn,
1268                                         ctdb->nodes,
1269                                         0,
1270                                         &ctdb->nodes[j]->known_public_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1273                                 ctdb->nodes[j]->pnn));
1274                         if (culprit) {
1275                                 *culprit = ctdb->nodes[j]->pnn;
1276                         }
1277                         return -1;
1278                 }
1279
1280                 if (rec->ip_check_disable_ctx == NULL) {
1281                         if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1282                                 DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1283                                 rec->need_takeover_run = true;
1284                         }
1285                 }
1286
1287                 /* grab a new shiny list of public ips from the node */
1288                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1289                                         CONTROL_TIMEOUT(),
1290                                         ctdb->nodes[j]->pnn,
1291                                         ctdb->nodes,
1292                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1293                                         &ctdb->nodes[j]->available_public_ips);
1294                 if (ret != 0) {
1295                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1296                                 ctdb->nodes[j]->pnn));
1297                         if (culprit) {
1298                                 *culprit = ctdb->nodes[j]->pnn;
1299                         }
1300                         return -1;
1301                 }
1302         }
1303
1304         return 0;
1305 }
1306
1307 /* when we start a recovery, make sure all nodes use the same reclock file
1308    setting
1309 */
1310 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1311 {
1312         struct ctdb_context *ctdb = rec->ctdb;
1313         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1314         TDB_DATA data;
1315         uint32_t *nodes;
1316
1317         if (ctdb->recovery_lock_file == NULL) {
1318                 data.dptr  = NULL;
1319                 data.dsize = 0;
1320         } else {
1321                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1322                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1323         }
1324
1325         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1326         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1327                                         nodes, 0,
1328                                         CONTROL_TIMEOUT(),
1329                                         false, data,
1330                                         NULL, NULL,
1331                                         rec) != 0) {
1332                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1333                 talloc_free(tmp_ctx);
1334                 return -1;
1335         }
1336
1337         talloc_free(tmp_ctx);
1338         return 0;
1339 }
1340
1341
1342 /*
1343   we are the recmaster, and recovery is needed - start a recovery run
1344  */
1345 static int do_recovery(struct ctdb_recoverd *rec, 
1346                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1347                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1348 {
1349         struct ctdb_context *ctdb = rec->ctdb;
1350         int i, j, ret;
1351         uint32_t generation;
1352         struct ctdb_dbid_map *dbmap;
1353         TDB_DATA data;
1354         uint32_t *nodes;
1355         struct timeval start_time;
1356         uint32_t culprit = (uint32_t)-1;
1357
1358         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1359
1360         /* if recovery fails, force it again */
1361         rec->need_recovery = true;
1362
1363         for (i=0; i<ctdb->num_nodes; i++) {
1364                 struct ctdb_banning_state *ban_state;
1365
1366                 if (ctdb->nodes[i]->ban_state == NULL) {
1367                         continue;
1368                 }
1369                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1370                 if (ban_state->count < 2*ctdb->num_nodes) {
1371                         continue;
1372                 }
1373                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1374                         ctdb->nodes[i]->pnn, ban_state->count,
1375                         ctdb->tunable.recovery_ban_period));
1376                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1377                 ban_state->count = 0;
1378         }
1379
1380
1381         if (ctdb->tunable.verify_recovery_lock != 0) {
1382                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1383                 start_time = timeval_current();
1384                 if (!ctdb_recovery_lock(ctdb, true)) {
1385                         ctdb_set_culprit(rec, pnn);
1386                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1387                         return -1;
1388                 }
1389                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1390                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1391         }
1392
1393         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1394
1395         /* get a list of all databases */
1396         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1397         if (ret != 0) {
1398                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1399                 return -1;
1400         }
1401
1402         /* we do the db creation before we set the recovery mode, so the freeze happens
1403            on all databases we will be dealing with. */
1404
1405         /* verify that we have all the databases any other node has */
1406         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1407         if (ret != 0) {
1408                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1409                 return -1;
1410         }
1411
1412         /* verify that all other nodes have all our databases */
1413         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1414         if (ret != 0) {
1415                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1416                 return -1;
1417         }
1418         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1419
1420         /* update the database priority for all remote databases */
1421         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1422         if (ret != 0) {
1423                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1424         }
1425         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1426
1427
1428         /* update all other nodes to use the same setting for reclock files
1429            as the local recovery master.
1430         */
1431         sync_recovery_lock_file_across_cluster(rec);
1432
1433         /* set recovery mode to active on all nodes */
1434         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1437                 return -1;
1438         }
1439
1440         /* execute the "startrecovery" event script on all nodes */
1441         ret = run_startrecovery_eventscript(rec, nodemap);
1442         if (ret!=0) {
1443                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1444                 return -1;
1445         }
1446
1447         /*
1448           update all nodes to have the same flags that we have
1449          */
1450         for (i=0;i<nodemap->num;i++) {
1451                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1452                         continue;
1453                 }
1454
1455                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1456                 if (ret != 0) {
1457                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1458                         return -1;
1459                 }
1460         }
1461
1462         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1463
1464         /* pick a new generation number */
1465         generation = new_generation();
1466
1467         /* change the vnnmap on this node to use the new generation 
1468            number but not on any other nodes.
1469            this guarantees that if we abort the recovery prematurely
1470            for some reason (a node stops responding?)
1471            that we can just return immediately and we will reenter
1472            recovery shortly again.
1473            I.e. we deliberately leave the cluster with an inconsistent
1474            generation id to allow us to abort recovery at any stage and
1475            just restart it from scratch.
1476          */
1477         vnnmap->generation = generation;
1478         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1479         if (ret != 0) {
1480                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1481                 return -1;
1482         }
1483
1484         data.dptr = (void *)&generation;
1485         data.dsize = sizeof(uint32_t);
1486
1487         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1488         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1489                                         nodes, 0,
1490                                         CONTROL_TIMEOUT(), false, data,
1491                                         NULL,
1492                                         transaction_start_fail_callback,
1493                                         rec) != 0) {
1494                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1495                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1496                                         nodes, 0,
1497                                         CONTROL_TIMEOUT(), false, tdb_null,
1498                                         NULL,
1499                                         NULL,
1500                                         NULL) != 0) {
1501                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1502                 }
1503                 return -1;
1504         }
1505
1506         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1507
1508         for (i=0;i<dbmap->num;i++) {
1509                 ret = recover_database(rec, mem_ctx,
1510                                        dbmap->dbs[i].dbid,
1511                                        dbmap->dbs[i].persistent,
1512                                        pnn, nodemap, generation);
1513                 if (ret != 0) {
1514                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1515                         return -1;
1516                 }
1517         }
1518
1519         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1520
1521         /* commit all the changes */
1522         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1523                                         nodes, 0,
1524                                         CONTROL_TIMEOUT(), false, data,
1525                                         NULL, NULL,
1526                                         NULL) != 0) {
1527                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1528                 return -1;
1529         }
1530
1531         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1532         
1533
1534         /* update the capabilities for all nodes */
1535         ret = update_capabilities(ctdb, nodemap);
1536         if (ret!=0) {
1537                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1538                 return -1;
1539         }
1540
1541         /* build a new vnn map with all the currently active and
1542            unbanned nodes */
1543         generation = new_generation();
1544         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1545         CTDB_NO_MEMORY(ctdb, vnnmap);
1546         vnnmap->generation = generation;
1547         vnnmap->size = 0;
1548         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1549         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1550         for (i=j=0;i<nodemap->num;i++) {
1551                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1552                         continue;
1553                 }
1554                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1555                         /* this node can not be an lmaster */
1556                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1557                         continue;
1558                 }
1559
1560                 vnnmap->size++;
1561                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1562                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1563                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1564
1565         }
1566         if (vnnmap->size == 0) {
1567                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1568                 vnnmap->size++;
1569                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1570                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1571                 vnnmap->map[0] = pnn;
1572         }       
1573
1574         /* update to the new vnnmap on all nodes */
1575         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1576         if (ret != 0) {
1577                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1578                 return -1;
1579         }
1580
1581         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1582
1583         /* update recmaster to point to us for all nodes */
1584         ret = set_recovery_master(ctdb, nodemap, pnn);
1585         if (ret!=0) {
1586                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1587                 return -1;
1588         }
1589
1590         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1591
1592         /*
1593           update all nodes to have the same flags that we have
1594          */
1595         for (i=0;i<nodemap->num;i++) {
1596                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1597                         continue;
1598                 }
1599
1600                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1601                 if (ret != 0) {
1602                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1603                         return -1;
1604                 }
1605         }
1606
1607         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1608
1609         /* disable recovery mode */
1610         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1611         if (ret != 0) {
1612                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1613                 return -1;
1614         }
1615
1616         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1617
1618         /*
1619           tell nodes to takeover their public IPs
1620          */
1621         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1622         if (ret != 0) {
1623                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1624                                  culprit));
1625                 return -1;
1626         }
1627         rec->need_takeover_run = false;
1628         ret = ctdb_takeover_run(ctdb, nodemap);
1629         if (ret != 0) {
1630                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1631                 return -1;
1632         }
1633         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1634
1635         /* execute the "recovered" event script on all nodes */
1636         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1637         if (ret!=0) {
1638                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1639                 return -1;
1640         }
1641
1642         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1643
1644         /* send a message to all clients telling them that the cluster 
1645            has been reconfigured */
1646         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1647
1648         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1649
1650         rec->need_recovery = false;
1651
1652         /* we managed to complete a full recovery, make sure to forgive
1653            any past sins by the nodes that could now participate in the
1654            recovery.
1655         */
1656         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1657         for (i=0;i<nodemap->num;i++) {
1658                 struct ctdb_banning_state *ban_state;
1659
1660                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1661                         continue;
1662                 }
1663
1664                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1665                 if (ban_state == NULL) {
1666                         continue;
1667                 }
1668
1669                 ban_state->count = 0;
1670         }
1671
1672
1673         /* We just finished a recovery successfully. 
1674            We now wait for rerecovery_timeout before we allow 
1675            another recovery to take place.
1676         */
1677         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1678         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1679         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1680
1681         return 0;
1682 }
1683
1684
1685 /*
1686   elections are won by first checking the number of connected nodes, then
1687   the priority time, then the pnn
1688  */
1689 struct election_message {
1690         uint32_t num_connected;
1691         struct timeval priority_time;
1692         uint32_t pnn;
1693         uint32_t node_flags;
1694 };
1695
1696 /*
1697   form this nodes election data
1698  */
1699 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1700 {
1701         int ret, i;
1702         struct ctdb_node_map *nodemap;
1703         struct ctdb_context *ctdb = rec->ctdb;
1704
1705         ZERO_STRUCTP(em);
1706
1707         em->pnn = rec->ctdb->pnn;
1708         em->priority_time = rec->priority_time;
1709
1710         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1711         if (ret != 0) {
1712                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1713                 return;
1714         }
1715
1716         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1717         em->node_flags = rec->node_flags;
1718
1719         for (i=0;i<nodemap->num;i++) {
1720                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1721                         em->num_connected++;
1722                 }
1723         }
1724
1725         /* we shouldnt try to win this election if we cant be a recmaster */
1726         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1727                 em->num_connected = 0;
1728                 em->priority_time = timeval_current();
1729         }
1730
1731         talloc_free(nodemap);
1732 }
1733
1734 /*
1735   see if the given election data wins
1736  */
1737 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1738 {
1739         struct election_message myem;
1740         int cmp = 0;
1741
1742         ctdb_election_data(rec, &myem);
1743
1744         /* we cant win if we dont have the recmaster capability */
1745         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1746                 return false;
1747         }
1748
1749         /* we cant win if we are banned */
1750         if (rec->node_flags & NODE_FLAGS_BANNED) {
1751                 return false;
1752         }       
1753
1754         /* we cant win if we are stopped */
1755         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1756                 return false;
1757         }       
1758
1759         /* we will automatically win if the other node is banned */
1760         if (em->node_flags & NODE_FLAGS_BANNED) {
1761                 return true;
1762         }
1763
1764         /* we will automatically win if the other node is banned */
1765         if (em->node_flags & NODE_FLAGS_STOPPED) {
1766                 return true;
1767         }
1768
1769         /* try to use the most connected node */
1770         if (cmp == 0) {
1771                 cmp = (int)myem.num_connected - (int)em->num_connected;
1772         }
1773
1774         /* then the longest running node */
1775         if (cmp == 0) {
1776                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1777         }
1778
1779         if (cmp == 0) {
1780                 cmp = (int)myem.pnn - (int)em->pnn;
1781         }
1782
1783         return cmp > 0;
1784 }
1785
1786 /*
1787   send out an election request
1788  */
1789 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1790 {
1791         int ret;
1792         TDB_DATA election_data;
1793         struct election_message emsg;
1794         uint64_t srvid;
1795         struct ctdb_context *ctdb = rec->ctdb;
1796
1797         srvid = CTDB_SRVID_RECOVERY;
1798
1799         ctdb_election_data(rec, &emsg);
1800
1801         election_data.dsize = sizeof(struct election_message);
1802         election_data.dptr  = (unsigned char *)&emsg;
1803
1804
1805         /* send an election message to all active nodes */
1806         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1807         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1808
1809
1810         /* A new node that is already frozen has entered the cluster.
1811            The existing nodes are not frozen and dont need to be frozen
1812            until the election has ended and we start the actual recovery
1813         */
1814         if (update_recmaster == true) {
1815                 /* first we assume we will win the election and set 
1816                    recoverymaster to be ourself on the current node
1817                  */
1818                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1819                 if (ret != 0) {
1820                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1821                         return -1;
1822                 }
1823         }
1824
1825
1826         return 0;
1827 }
1828
1829 /*
1830   this function will unban all nodes in the cluster
1831 */
1832 static void unban_all_nodes(struct ctdb_context *ctdb)
1833 {
1834         int ret, i;
1835         struct ctdb_node_map *nodemap;
1836         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1837         
1838         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1841                 return;
1842         }
1843
1844         for (i=0;i<nodemap->num;i++) {
1845                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1846                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1847                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1848                 }
1849         }
1850
1851         talloc_free(tmp_ctx);
1852 }
1853
1854
1855 /*
1856   we think we are winning the election - send a broadcast election request
1857  */
1858 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1859 {
1860         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1861         int ret;
1862
1863         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1864         if (ret != 0) {
1865                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1866         }
1867
1868         talloc_free(rec->send_election_te);
1869         rec->send_election_te = NULL;
1870 }
1871
1872 /*
1873   handler for memory dumps
1874 */
1875 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1876                              TDB_DATA data, void *private_data)
1877 {
1878         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1879         TDB_DATA *dump;
1880         int ret;
1881         struct rd_memdump_reply *rd;
1882
1883         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1884                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1885                 talloc_free(tmp_ctx);
1886                 return;
1887         }
1888         rd = (struct rd_memdump_reply *)data.dptr;
1889
1890         dump = talloc_zero(tmp_ctx, TDB_DATA);
1891         if (dump == NULL) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1893                 talloc_free(tmp_ctx);
1894                 return;
1895         }
1896         ret = ctdb_dump_memory(ctdb, dump);
1897         if (ret != 0) {
1898                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1899                 talloc_free(tmp_ctx);
1900                 return;
1901         }
1902
1903 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1904
1905         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1906         if (ret != 0) {
1907                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1908                 talloc_free(tmp_ctx);
1909                 return;
1910         }
1911
1912         talloc_free(tmp_ctx);
1913 }
1914
1915 /*
1916   handler for reload_nodes
1917 */
1918 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1919                              TDB_DATA data, void *private_data)
1920 {
1921         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1922
1923         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1924
1925         reload_nodes_file(rec->ctdb);
1926 }
1927
1928
1929 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1930                               struct timeval yt, void *p)
1931 {
1932         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1933
1934         talloc_free(rec->ip_check_disable_ctx);
1935         rec->ip_check_disable_ctx = NULL;
1936 }
1937
1938
1939 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1940                              TDB_DATA data, void *private_data)
1941 {
1942         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1943         struct ctdb_public_ip *ip;
1944
1945         if (rec->recmaster != rec->ctdb->pnn) {
1946                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1947                 return;
1948         }
1949
1950         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1951                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1952                 return;
1953         }
1954
1955         ip = (struct ctdb_public_ip *)data.dptr;
1956
1957         update_ip_assignment_tree(rec->ctdb, ip);
1958 }
1959
1960
1961 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1962                              TDB_DATA data, void *private_data)
1963 {
1964         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1965         uint32_t timeout;
1966
1967         if (rec->ip_check_disable_ctx != NULL) {
1968                 talloc_free(rec->ip_check_disable_ctx);
1969                 rec->ip_check_disable_ctx = NULL;
1970         }
1971
1972         if (data.dsize != sizeof(uint32_t)) {
1973                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1974                                  "expexting %lu\n", (long unsigned)data.dsize,
1975                                  (long unsigned)sizeof(uint32_t)));
1976                 return;
1977         }
1978         if (data.dptr == NULL) {
1979                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1980                 return;
1981         }
1982
1983         timeout = *((uint32_t *)data.dptr);
1984         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1985
1986         rec->ip_check_disable_ctx = talloc_new(rec);
1987         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1988
1989         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1990 }
1991
1992
1993 /*
1994   handler for ip reallocate, just add it to the list of callers and 
1995   handle this later in the monitor_cluster loop so we do not recurse
1996   with other callers to takeover_run()
1997 */
1998 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1999                              TDB_DATA data, void *private_data)
2000 {
2001         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2002         struct ip_reallocate_list *caller;
2003
2004         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2005                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2006                 return;
2007         }
2008
2009         if (rec->ip_reallocate_ctx == NULL) {
2010                 rec->ip_reallocate_ctx = talloc_new(rec);
2011                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2012         }
2013
2014         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2015         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2016
2017         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2018         caller->next = rec->reallocate_callers;
2019         rec->reallocate_callers = caller;
2020
2021         return;
2022 }
2023
2024 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2025 {
2026         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2027         TDB_DATA result;
2028         int32_t ret;
2029         struct ip_reallocate_list *callers;
2030         uint32_t culprit;
2031
2032         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2033
2034         /* update the list of public ips that a node can handle for
2035            all connected nodes
2036         */
2037         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2038         if (ret != 0) {
2039                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2040                                  culprit));
2041                 rec->need_takeover_run = true;
2042         }
2043         if (ret == 0) {
2044                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2045                 if (ret != 0) {
2046                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2047                                          culprit));
2048                         rec->need_takeover_run = true;
2049                 }
2050         }
2051
2052         result.dsize = sizeof(int32_t);
2053         result.dptr  = (uint8_t *)&ret;
2054
2055         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2056
2057                 /* Someone that sent srvid==0 does not want a reply */
2058                 if (callers->rd->srvid == 0) {
2059                         continue;
2060                 }
2061                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2062                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2063                                   (unsigned long long)callers->rd->srvid));
2064                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2065                 if (ret != 0) {
2066                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2067                                          "message to %u:%llu\n",
2068                                          (unsigned)callers->rd->pnn,
2069                                          (unsigned long long)callers->rd->srvid));
2070                 }
2071         }
2072
2073         talloc_free(tmp_ctx);
2074         talloc_free(rec->ip_reallocate_ctx);
2075         rec->ip_reallocate_ctx = NULL;
2076         rec->reallocate_callers = NULL;
2077         
2078 }
2079
2080
2081 /*
2082   handler for recovery master elections
2083 */
2084 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2085                              TDB_DATA data, void *private_data)
2086 {
2087         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2088         int ret;
2089         struct election_message *em = (struct election_message *)data.dptr;
2090         TALLOC_CTX *mem_ctx;
2091
2092         /* we got an election packet - update the timeout for the election */
2093         talloc_free(rec->election_timeout);
2094         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2095                                                 fast_start ?
2096                                                 timeval_current_ofs(0, 500000) :
2097                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2098                                                 ctdb_election_timeout, rec);
2099
2100         mem_ctx = talloc_new(ctdb);
2101
2102         /* someone called an election. check their election data
2103            and if we disagree and we would rather be the elected node, 
2104            send a new election message to all other nodes
2105          */
2106         if (ctdb_election_win(rec, em)) {
2107                 if (!rec->send_election_te) {
2108                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2109                                                                 timeval_current_ofs(0, 500000),
2110                                                                 election_send_request, rec);
2111                 }
2112                 talloc_free(mem_ctx);
2113                 /*unban_all_nodes(ctdb);*/
2114                 return;
2115         }
2116         
2117         /* we didn't win */
2118         talloc_free(rec->send_election_te);
2119         rec->send_election_te = NULL;
2120
2121         if (ctdb->tunable.verify_recovery_lock != 0) {
2122                 /* release the recmaster lock */
2123                 if (em->pnn != ctdb->pnn &&
2124                     ctdb->recovery_lock_fd != -1) {
2125                         close(ctdb->recovery_lock_fd);
2126                         ctdb->recovery_lock_fd = -1;
2127                         unban_all_nodes(ctdb);
2128                 }
2129         }
2130
2131         /* ok, let that guy become recmaster then */
2132         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2135                 talloc_free(mem_ctx);
2136                 return;
2137         }
2138
2139         talloc_free(mem_ctx);
2140         return;
2141 }
2142
2143
2144 /*
2145   force the start of the election process
2146  */
2147 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2148                            struct ctdb_node_map *nodemap)
2149 {
2150         int ret;
2151         struct ctdb_context *ctdb = rec->ctdb;
2152
2153         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2154
2155         /* set all nodes to recovery mode to stop all internode traffic */
2156         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2157         if (ret != 0) {
2158                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2159                 return;
2160         }
2161
2162         talloc_free(rec->election_timeout);
2163         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2164                                                 fast_start ?
2165                                                 timeval_current_ofs(0, 500000) :
2166                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2167                                                 ctdb_election_timeout, rec);
2168
2169         ret = send_election_request(rec, pnn, true);
2170         if (ret!=0) {
2171                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2172                 return;
2173         }
2174
2175         /* wait for a few seconds to collect all responses */
2176         ctdb_wait_election(rec);
2177 }
2178
2179
2180
2181 /*
2182   handler for when a node changes its flags
2183 */
2184 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2185                             TDB_DATA data, void *private_data)
2186 {
2187         int ret;
2188         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2189         struct ctdb_node_map *nodemap=NULL;
2190         TALLOC_CTX *tmp_ctx;
2191         uint32_t changed_flags;
2192         int i;
2193         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2194         int disabled_flag_changed;
2195
2196         if (data.dsize != sizeof(*c)) {
2197                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2198                 return;
2199         }
2200
2201         tmp_ctx = talloc_new(ctdb);
2202         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2203
2204         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2205         if (ret != 0) {
2206                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2207                 talloc_free(tmp_ctx);
2208                 return;         
2209         }
2210
2211
2212         for (i=0;i<nodemap->num;i++) {
2213                 if (nodemap->nodes[i].pnn == c->pnn) break;
2214         }
2215
2216         if (i == nodemap->num) {
2217                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2218                 talloc_free(tmp_ctx);
2219                 return;
2220         }
2221
2222         changed_flags = c->old_flags ^ c->new_flags;
2223
2224         if (nodemap->nodes[i].flags != c->new_flags) {
2225                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2226         }
2227
2228         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2229
2230         nodemap->nodes[i].flags = c->new_flags;
2231
2232         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2233                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2234
2235         if (ret == 0) {
2236                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2237                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2238         }
2239         
2240         if (ret == 0 &&
2241             ctdb->recovery_master == ctdb->pnn &&
2242             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2243                 /* Only do the takeover run if the perm disabled or unhealthy
2244                    flags changed since these will cause an ip failover but not
2245                    a recovery.
2246                    If the node became disconnected or banned this will also
2247                    lead to an ip address failover but that is handled 
2248                    during recovery
2249                 */
2250                 if (disabled_flag_changed) {
2251                         rec->need_takeover_run = true;
2252                 }
2253         }
2254
2255         talloc_free(tmp_ctx);
2256 }
2257
2258 /*
2259   handler for when we need to push out flag changes ot all other nodes
2260 */
2261 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2262                             TDB_DATA data, void *private_data)
2263 {
2264         int ret;
2265         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2266         struct ctdb_node_map *nodemap=NULL;
2267         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2268         uint32_t recmaster;
2269         uint32_t *nodes;
2270
2271         /* find the recovery master */
2272         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2273         if (ret != 0) {
2274                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2275                 talloc_free(tmp_ctx);
2276                 return;
2277         }
2278
2279         /* read the node flags from the recmaster */
2280         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2281         if (ret != 0) {
2282                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2283                 talloc_free(tmp_ctx);
2284                 return;
2285         }
2286         if (c->pnn >= nodemap->num) {
2287                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2288                 talloc_free(tmp_ctx);
2289                 return;
2290         }
2291
2292         /* send the flags update to all connected nodes */
2293         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2294
2295         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2296                                       nodes, 0, CONTROL_TIMEOUT(),
2297                                       false, data,
2298                                       NULL, NULL,
2299                                       NULL) != 0) {
2300                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2301
2302                 talloc_free(tmp_ctx);
2303                 return;
2304         }
2305
2306         talloc_free(tmp_ctx);
2307 }
2308
2309
2310 struct verify_recmode_normal_data {
2311         uint32_t count;
2312         enum monitor_result status;
2313 };
2314
2315 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2316 {
2317         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2318
2319
2320         /* one more node has responded with recmode data*/
2321         rmdata->count--;
2322
2323         /* if we failed to get the recmode, then return an error and let
2324            the main loop try again.
2325         */
2326         if (state->state != CTDB_CONTROL_DONE) {
2327                 if (rmdata->status == MONITOR_OK) {
2328                         rmdata->status = MONITOR_FAILED;
2329                 }
2330                 return;
2331         }
2332
2333         /* if we got a response, then the recmode will be stored in the
2334            status field
2335         */
2336         if (state->status != CTDB_RECOVERY_NORMAL) {
2337                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2338                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2339         }
2340
2341         return;
2342 }
2343
2344
2345 /* verify that all nodes are in normal recovery mode */
2346 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2347 {
2348         struct verify_recmode_normal_data *rmdata;
2349         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2350         struct ctdb_client_control_state *state;
2351         enum monitor_result status;
2352         int j;
2353         
2354         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2355         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2356         rmdata->count  = 0;
2357         rmdata->status = MONITOR_OK;
2358
2359         /* loop over all active nodes and send an async getrecmode call to 
2360            them*/
2361         for (j=0; j<nodemap->num; j++) {
2362                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2363                         continue;
2364                 }
2365                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2366                                         CONTROL_TIMEOUT(), 
2367                                         nodemap->nodes[j].pnn);
2368                 if (state == NULL) {
2369                         /* we failed to send the control, treat this as 
2370                            an error and try again next iteration
2371                         */                      
2372                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2373                         talloc_free(mem_ctx);
2374                         return MONITOR_FAILED;
2375                 }
2376
2377                 /* set up the callback functions */
2378                 state->async.fn = verify_recmode_normal_callback;
2379                 state->async.private_data = rmdata;
2380
2381                 /* one more control to wait for to complete */
2382                 rmdata->count++;
2383         }
2384
2385
2386         /* now wait for up to the maximum number of seconds allowed
2387            or until all nodes we expect a response from has replied
2388         */
2389         while (rmdata->count > 0) {
2390                 event_loop_once(ctdb->ev);
2391         }
2392
2393         status = rmdata->status;
2394         talloc_free(mem_ctx);
2395         return status;
2396 }
2397
2398
2399 struct verify_recmaster_data {
2400         struct ctdb_recoverd *rec;
2401         uint32_t count;
2402         uint32_t pnn;
2403         enum monitor_result status;
2404 };
2405
2406 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2407 {
2408         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2409
2410
2411         /* one more node has responded with recmaster data*/
2412         rmdata->count--;
2413
2414         /* if we failed to get the recmaster, then return an error and let
2415            the main loop try again.
2416         */
2417         if (state->state != CTDB_CONTROL_DONE) {
2418                 if (rmdata->status == MONITOR_OK) {
2419                         rmdata->status = MONITOR_FAILED;
2420                 }
2421                 return;
2422         }
2423
2424         /* if we got a response, then the recmaster will be stored in the
2425            status field
2426         */
2427         if (state->status != rmdata->pnn) {
2428                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2429                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2430                 rmdata->status = MONITOR_ELECTION_NEEDED;
2431         }
2432
2433         return;
2434 }
2435
2436
2437 /* verify that all nodes agree that we are the recmaster */
2438 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2439 {
2440         struct ctdb_context *ctdb = rec->ctdb;
2441         struct verify_recmaster_data *rmdata;
2442         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2443         struct ctdb_client_control_state *state;
2444         enum monitor_result status;
2445         int j;
2446         
2447         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2448         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2449         rmdata->rec    = rec;
2450         rmdata->count  = 0;
2451         rmdata->pnn    = pnn;
2452         rmdata->status = MONITOR_OK;
2453
2454         /* loop over all active nodes and send an async getrecmaster call to 
2455            them*/
2456         for (j=0; j<nodemap->num; j++) {
2457                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2458                         continue;
2459                 }
2460                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2461                                         CONTROL_TIMEOUT(),
2462                                         nodemap->nodes[j].pnn);
2463                 if (state == NULL) {
2464                         /* we failed to send the control, treat this as 
2465                            an error and try again next iteration
2466                         */                      
2467                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2468                         talloc_free(mem_ctx);
2469                         return MONITOR_FAILED;
2470                 }
2471
2472                 /* set up the callback functions */
2473                 state->async.fn = verify_recmaster_callback;
2474                 state->async.private_data = rmdata;
2475
2476                 /* one more control to wait for to complete */
2477                 rmdata->count++;
2478         }
2479
2480
2481         /* now wait for up to the maximum number of seconds allowed
2482            or until all nodes we expect a response from has replied
2483         */
2484         while (rmdata->count > 0) {
2485                 event_loop_once(ctdb->ev);
2486         }
2487
2488         status = rmdata->status;
2489         talloc_free(mem_ctx);
2490         return status;
2491 }
2492
2493
2494 /* called to check that the local allocation of public ip addresses is ok.
2495 */
2496 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2497 {
2498         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2499         struct ctdb_control_get_ifaces *ifaces = NULL;
2500         struct ctdb_all_public_ips *ips = NULL;
2501         struct ctdb_uptime *uptime1 = NULL;
2502         struct ctdb_uptime *uptime2 = NULL;
2503         int ret, j;
2504         bool need_iface_check = false;
2505         bool need_takeover_run = false;
2506
2507         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2508                                 CTDB_CURRENT_NODE, &uptime1);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2511                 talloc_free(mem_ctx);
2512                 return -1;
2513         }
2514
2515
2516         /* read the interfaces from the local node */
2517         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2518         if (ret != 0) {
2519                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2520                 talloc_free(mem_ctx);
2521                 return -1;
2522         }
2523
2524         if (!rec->ifaces) {
2525                 need_iface_check = true;
2526         } else if (rec->ifaces->num != ifaces->num) {
2527                 need_iface_check = true;
2528         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2529                 need_iface_check = true;
2530         }
2531
2532         if (need_iface_check) {
2533                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2534                                      "local node %u - force takeover run\n",
2535                                      pnn));
2536                 need_takeover_run = true;
2537         }
2538
2539         /* read the ip allocation from the local node */
2540         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2541         if (ret != 0) {
2542                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2543                 talloc_free(mem_ctx);
2544                 return -1;
2545         }
2546
2547         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2548                                 CTDB_CURRENT_NODE, &uptime2);
2549         if (ret != 0) {
2550                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2551                 talloc_free(mem_ctx);
2552                 return -1;
2553         }
2554
2555         /* skip the check if the startrecovery time has changed */
2556         if (timeval_compare(&uptime1->last_recovery_started,
2557                             &uptime2->last_recovery_started) != 0) {
2558                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2559                 talloc_free(mem_ctx);
2560                 return 0;
2561         }
2562
2563         /* skip the check if the endrecovery time has changed */
2564         if (timeval_compare(&uptime1->last_recovery_finished,
2565                             &uptime2->last_recovery_finished) != 0) {
2566                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2567                 talloc_free(mem_ctx);
2568                 return 0;
2569         }
2570
2571         /* skip the check if we have started but not finished recovery */
2572         if (timeval_compare(&uptime1->last_recovery_finished,
2573                             &uptime1->last_recovery_started) != 1) {
2574                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2575                 talloc_free(mem_ctx);
2576
2577                 return 0;
2578         }
2579
2580         talloc_free(rec->ifaces);
2581         rec->ifaces = talloc_steal(rec, ifaces);
2582
2583         /* verify that we have the ip addresses we should have
2584            and we dont have ones we shouldnt have.
2585            if we find an inconsistency we set recmode to
2586            active on the local node and wait for the recmaster
2587            to do a full blown recovery
2588         */
2589         for (j=0; j<ips->num; j++) {
2590                 if (ips->ips[j].pnn == pnn) {
2591                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2592                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2593                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2594                                 need_takeover_run = true;
2595                         }
2596                 } else {
2597                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2598                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2599                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2600                                 need_takeover_run = true;
2601                         }
2602                 }
2603         }
2604
2605         if (need_takeover_run) {
2606                 struct takeover_run_reply rd;
2607                 TDB_DATA data;
2608
2609                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2610
2611                 rd.pnn = ctdb->pnn;
2612                 rd.srvid = 0;
2613                 data.dptr = (uint8_t *)&rd;
2614                 data.dsize = sizeof(rd);
2615
2616                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2617                 if (ret != 0) {
2618                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2619                 }
2620         }
2621         talloc_free(mem_ctx);
2622         return 0;
2623 }
2624
2625
2626 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2627 {
2628         struct ctdb_node_map **remote_nodemaps = callback_data;
2629
2630         if (node_pnn >= ctdb->num_nodes) {
2631                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2632                 return;
2633         }
2634
2635         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2636
2637 }
2638
2639 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2640         struct ctdb_node_map *nodemap,
2641         struct ctdb_node_map **remote_nodemaps)
2642 {
2643         uint32_t *nodes;
2644
2645         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2646         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2647                                         nodes, 0,
2648                                         CONTROL_TIMEOUT(), false, tdb_null,
2649                                         async_getnodemap_callback,
2650                                         NULL,
2651                                         remote_nodemaps) != 0) {
2652                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2653
2654                 return -1;
2655         }
2656
2657         return 0;
2658 }
2659
2660 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2661 struct ctdb_check_reclock_state {
2662         struct ctdb_context *ctdb;
2663         struct timeval start_time;
2664         int fd[2];
2665         pid_t child;
2666         struct timed_event *te;
2667         struct fd_event *fde;
2668         enum reclock_child_status status;
2669 };
2670
2671 /* when we free the reclock state we must kill any child process.
2672 */
2673 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2674 {
2675         struct ctdb_context *ctdb = state->ctdb;
2676
2677         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2678
2679         if (state->fd[0] != -1) {
2680                 close(state->fd[0]);
2681                 state->fd[0] = -1;
2682         }
2683         if (state->fd[1] != -1) {
2684                 close(state->fd[1]);
2685                 state->fd[1] = -1;
2686         }
2687         kill(state->child, SIGKILL);
2688         return 0;
2689 }
2690
2691 /*
2692   called if our check_reclock child times out. this would happen if
2693   i/o to the reclock file blocks.
2694  */
2695 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2696                                          struct timeval t, void *private_data)
2697 {
2698         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2699                                            struct ctdb_check_reclock_state);
2700
2701         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2702         state->status = RECLOCK_TIMEOUT;
2703 }
2704
2705 /* this is called when the child process has completed checking the reclock
2706    file and has written data back to us through the pipe.
2707 */
2708 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2709                              uint16_t flags, void *private_data)
2710 {
2711         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2712                                              struct ctdb_check_reclock_state);
2713         char c = 0;
2714         int ret;
2715
2716         /* we got a response from our child process so we can abort the
2717            timeout.
2718         */
2719         talloc_free(state->te);
2720         state->te = NULL;
2721
2722         ret = read(state->fd[0], &c, 1);
2723         if (ret != 1 || c != RECLOCK_OK) {
2724                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2725                 state->status = RECLOCK_FAILED;
2726
2727                 return;
2728         }
2729
2730         state->status = RECLOCK_OK;
2731         return;
2732 }
2733
2734 static int check_recovery_lock(struct ctdb_context *ctdb)
2735 {
2736         int ret;
2737         struct ctdb_check_reclock_state *state;
2738         pid_t parent = getpid();
2739
2740         if (ctdb->recovery_lock_fd == -1) {
2741                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2742                 return -1;
2743         }
2744
2745         state = talloc(ctdb, struct ctdb_check_reclock_state);
2746         CTDB_NO_MEMORY(ctdb, state);
2747
2748         state->ctdb = ctdb;
2749         state->start_time = timeval_current();
2750         state->status = RECLOCK_CHECKING;
2751         state->fd[0] = -1;
2752         state->fd[1] = -1;
2753
2754         ret = pipe(state->fd);
2755         if (ret != 0) {
2756                 talloc_free(state);
2757                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2758                 return -1;
2759         }
2760
2761         state->child = fork();
2762         if (state->child == (pid_t)-1) {
2763                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2764                 close(state->fd[0]);
2765                 state->fd[0] = -1;
2766                 close(state->fd[1]);
2767                 state->fd[1] = -1;
2768                 talloc_free(state);
2769                 return -1;
2770         }
2771
2772         if (state->child == 0) {
2773                 char cc = RECLOCK_OK;
2774                 close(state->fd[0]);
2775                 state->fd[0] = -1;
2776
2777                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2778                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2779                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2780                         cc = RECLOCK_FAILED;
2781                 }
2782
2783                 write(state->fd[1], &cc, 1);
2784                 /* make sure we die when our parent dies */
2785                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2786                         sleep(5);
2787                         write(state->fd[1], &cc, 1);
2788                 }
2789                 _exit(0);
2790         }
2791         close(state->fd[1]);
2792         state->fd[1] = -1;
2793         set_close_on_exec(state->fd[0]);
2794
2795         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2796
2797         talloc_set_destructor(state, check_reclock_destructor);
2798
2799         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2800                                     ctdb_check_reclock_timeout, state);
2801         if (state->te == NULL) {
2802                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2803                 talloc_free(state);
2804                 return -1;
2805         }
2806
2807         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2808                                 EVENT_FD_READ,
2809                                 reclock_child_handler,
2810                                 (void *)state);
2811
2812         if (state->fde == NULL) {
2813                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2814                 talloc_free(state);
2815                 return -1;
2816         }
2817         tevent_fd_set_auto_close(state->fde);
2818
2819         while (state->status == RECLOCK_CHECKING) {
2820                 event_loop_once(ctdb->ev);
2821         }
2822
2823         if (state->status == RECLOCK_FAILED) {
2824                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2825                 close(ctdb->recovery_lock_fd);
2826                 ctdb->recovery_lock_fd = -1;
2827                 talloc_free(state);
2828                 return -1;
2829         }
2830
2831         talloc_free(state);
2832         return 0;
2833 }
2834
2835 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2836 {
2837         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2838         const char *reclockfile;
2839
2840         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2841                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2842                 talloc_free(tmp_ctx);
2843                 return -1;      
2844         }
2845
2846         if (reclockfile == NULL) {
2847                 if (ctdb->recovery_lock_file != NULL) {
2848                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2849                         talloc_free(ctdb->recovery_lock_file);
2850                         ctdb->recovery_lock_file = NULL;
2851                         if (ctdb->recovery_lock_fd != -1) {
2852                                 close(ctdb->recovery_lock_fd);
2853                                 ctdb->recovery_lock_fd = -1;
2854                         }
2855                 }
2856                 ctdb->tunable.verify_recovery_lock = 0;
2857                 talloc_free(tmp_ctx);
2858                 return 0;
2859         }
2860
2861         if (ctdb->recovery_lock_file == NULL) {
2862                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2863                 if (ctdb->recovery_lock_fd != -1) {
2864                         close(ctdb->recovery_lock_fd);
2865                         ctdb->recovery_lock_fd = -1;
2866                 }
2867                 talloc_free(tmp_ctx);
2868                 return 0;
2869         }
2870
2871
2872         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2873                 talloc_free(tmp_ctx);
2874                 return 0;
2875         }
2876
2877         talloc_free(ctdb->recovery_lock_file);
2878         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2879         ctdb->tunable.verify_recovery_lock = 0;
2880         if (ctdb->recovery_lock_fd != -1) {
2881                 close(ctdb->recovery_lock_fd);
2882                 ctdb->recovery_lock_fd = -1;
2883         }
2884
2885         talloc_free(tmp_ctx);
2886         return 0;
2887 }
2888
2889 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2890                       TALLOC_CTX *mem_ctx)
2891 {
2892         uint32_t pnn;
2893         struct ctdb_node_map *nodemap=NULL;
2894         struct ctdb_node_map *recmaster_nodemap=NULL;
2895         struct ctdb_node_map **remote_nodemaps=NULL;
2896         struct ctdb_vnn_map *vnnmap=NULL;
2897         struct ctdb_vnn_map *remote_vnnmap=NULL;
2898         int32_t debug_level;
2899         int i, j, ret;
2900
2901
2902
2903         /* verify that the main daemon is still running */
2904         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2905                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2906                 exit(-1);
2907         }
2908
2909         /* ping the local daemon to tell it we are alive */
2910         ctdb_ctrl_recd_ping(ctdb);
2911
2912         if (rec->election_timeout) {
2913                 /* an election is in progress */
2914                 return;
2915         }
2916
2917         /* read the debug level from the parent and update locally */
2918         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2919         if (ret !=0) {
2920                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2921                 return;
2922         }
2923         LogLevel = debug_level;
2924
2925
2926         /* We must check if we need to ban a node here but we want to do this
2927            as early as possible so we dont wait until we have pulled the node
2928            map from the local node. thats why we have the hardcoded value 20
2929         */
2930         for (i=0; i<ctdb->num_nodes; i++) {
2931                 struct ctdb_banning_state *ban_state;
2932
2933                 if (ctdb->nodes[i]->ban_state == NULL) {
2934                         continue;
2935                 }
2936                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2937                 if (ban_state->count < 20) {
2938                         continue;
2939                 }
2940                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2941                         ctdb->nodes[i]->pnn, ban_state->count,
2942                         ctdb->tunable.recovery_ban_period));
2943                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2944                 ban_state->count = 0;
2945         }
2946
2947         /* get relevant tunables */
2948         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2949         if (ret != 0) {
2950                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2951                 return;
2952         }
2953
2954         /* get the current recovery lock file from the server */
2955         if (update_recovery_lock_file(ctdb) != 0) {
2956                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2957                 return;
2958         }
2959
2960         /* Make sure that if recovery lock verification becomes disabled when
2961            we close the file
2962         */
2963         if (ctdb->tunable.verify_recovery_lock == 0) {
2964                 if (ctdb->recovery_lock_fd != -1) {
2965                         close(ctdb->recovery_lock_fd);
2966                         ctdb->recovery_lock_fd = -1;
2967                 }
2968         }
2969
2970         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2971         if (pnn == (uint32_t)-1) {
2972                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2973                 return;
2974         }
2975
2976         /* get the vnnmap */
2977         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2978         if (ret != 0) {
2979                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2980                 return;
2981         }
2982
2983
2984         /* get number of nodes */
2985         if (rec->nodemap) {
2986                 talloc_free(rec->nodemap);
2987                 rec->nodemap = NULL;
2988                 nodemap=NULL;
2989         }
2990         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2991         if (ret != 0) {
2992                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2993                 return;
2994         }
2995         nodemap = rec->nodemap;
2996
2997         /* check which node is the recovery master */
2998         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2999         if (ret != 0) {
3000                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3001                 return;
3002         }
3003
3004         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3005         if (rec->recmaster != pnn) {
3006                 if (rec->ip_reallocate_ctx != NULL) {
3007                         talloc_free(rec->ip_reallocate_ctx);
3008                         rec->ip_reallocate_ctx = NULL;
3009                         rec->reallocate_callers = NULL;
3010                 }
3011         }
3012         /* if there are takeovers requested, perform it and notify the waiters */
3013         if (rec->reallocate_callers) {
3014                 process_ipreallocate_requests(ctdb, rec);
3015         }
3016
3017         if (rec->recmaster == (uint32_t)-1) {
3018                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3019                 force_election(rec, pnn, nodemap);
3020                 return;
3021         }
3022
3023
3024         /* if the local daemon is STOPPED, we verify that the databases are
3025            also frozen and thet the recmode is set to active 
3026         */
3027         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3028                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3029                 if (ret != 0) {
3030                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3031                 }
3032                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3033                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3034
3035                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3036                         if (ret != 0) {
3037                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3038                                 return;
3039                         }
3040                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3041                         if (ret != 0) {
3042                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3043
3044                                 return;
3045                         }
3046                         return;
3047                 }
3048         }
3049         /* If the local node is stopped, verify we are not the recmaster 
3050            and yield this role if so
3051         */
3052         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3053                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3054                 force_election(rec, pnn, nodemap);
3055                 return;
3056         }
3057         
3058         /* check that we (recovery daemon) and the local ctdb daemon
3059            agrees on whether we are banned or not
3060         */
3061 //qqq
3062
3063         /* remember our own node flags */
3064         rec->node_flags = nodemap->nodes[pnn].flags;
3065
3066         /* count how many active nodes there are */
3067         rec->num_active    = 0;
3068         rec->num_connected = 0;
3069         for (i=0; i<nodemap->num; i++) {
3070                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3071                         rec->num_active++;
3072                 }
3073                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3074                         rec->num_connected++;
3075                 }
3076         }
3077
3078
3079         /* verify that the recmaster node is still active */
3080         for (j=0; j<nodemap->num; j++) {
3081                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3082                         break;
3083                 }
3084         }
3085
3086         if (j == nodemap->num) {
3087                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3088                 force_election(rec, pnn, nodemap);
3089                 return;
3090         }
3091
3092         /* if recovery master is disconnected we must elect a new recmaster */
3093         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3094                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3095                 force_election(rec, pnn, nodemap);
3096                 return;
3097         }
3098
3099         /* grap the nodemap from the recovery master to check if it is banned */
3100         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3101                                    mem_ctx, &recmaster_nodemap);
3102         if (ret != 0) {
3103                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3104                           nodemap->nodes[j].pnn));
3105                 return;
3106         }
3107
3108
3109         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3110                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3111                 force_election(rec, pnn, nodemap);
3112                 return;
3113         }
3114
3115
3116         /* verify that we have all ip addresses we should have and we dont
3117          * have addresses we shouldnt have.
3118          */ 
3119         if (ctdb->do_checkpublicip) {
3120                 if (rec->ip_check_disable_ctx == NULL) {
3121                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3122                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3123                         }
3124                 }
3125         }
3126
3127
3128         /* if we are not the recmaster then we do not need to check
3129            if recovery is needed
3130          */
3131         if (pnn != rec->recmaster) {
3132                 return;
3133         }
3134
3135
3136         /* ensure our local copies of flags are right */
3137         ret = update_local_flags(rec, nodemap);
3138         if (ret == MONITOR_ELECTION_NEEDED) {
3139                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3140                 force_election(rec, pnn, nodemap);
3141                 return;
3142         }
3143         if (ret != MONITOR_OK) {
3144                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3145                 return;
3146         }
3147
3148         if (ctdb->num_nodes != nodemap->num) {
3149                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3150                 reload_nodes_file(ctdb);
3151                 return;
3152         }
3153
3154         /* verify that all active nodes agree that we are the recmaster */
3155         switch (verify_recmaster(rec, nodemap, pnn)) {
3156         case MONITOR_RECOVERY_NEEDED:
3157                 /* can not happen */
3158                 return;
3159         case MONITOR_ELECTION_NEEDED:
3160                 force_election(rec, pnn, nodemap);
3161                 return;
3162         case MONITOR_OK:
3163                 break;
3164         case MONITOR_FAILED:
3165                 return;
3166         }
3167
3168
3169         if (rec->need_recovery) {
3170                 /* a previous recovery didn't finish */
3171                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3172                 return;
3173         }
3174
3175         /* verify that all active nodes are in normal mode 
3176            and not in recovery mode 
3177         */
3178         switch (verify_recmode(ctdb, nodemap)) {
3179         case MONITOR_RECOVERY_NEEDED:
3180                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3181                 return;
3182         case MONITOR_FAILED:
3183                 return;
3184         case MONITOR_ELECTION_NEEDED:
3185                 /* can not happen */
3186         case MONITOR_OK:
3187                 break;
3188         }
3189
3190
3191         if (ctdb->tunable.verify_recovery_lock != 0) {
3192                 /* we should have the reclock - check its not stale */
3193                 ret = check_recovery_lock(ctdb);
3194                 if (ret != 0) {
3195                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3196                         ctdb_set_culprit(rec, ctdb->pnn);
3197                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3198                         return;
3199                 }
3200         }
3201
3202         /* get the nodemap for all active remote nodes
3203          */
3204         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3205         if (remote_nodemaps == NULL) {
3206                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3207                 return;
3208         }
3209         for(i=0; i<nodemap->num; i++) {
3210                 remote_nodemaps[i] = NULL;
3211         }
3212         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3213                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3214                 return;
3215         } 
3216
3217         /* verify that all other nodes have the same nodemap as we have
3218         */
3219         for (j=0; j<nodemap->num; j++) {
3220                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3221                         continue;
3222                 }
3223
3224                 if (remote_nodemaps[j] == NULL) {
3225                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3226                         ctdb_set_culprit(rec, j);
3227
3228                         return;
3229                 }
3230
3231                 /* if the nodes disagree on how many nodes there are
3232                    then this is a good reason to try recovery
3233                  */
3234                 if (remote_nodemaps[j]->num != nodemap->num) {
3235                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3236                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3237                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3238                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3239                         return;
3240                 }
3241
3242                 /* if the nodes disagree on which nodes exist and are
3243                    active, then that is also a good reason to do recovery
3244                  */
3245                 for (i=0;i<nodemap->num;i++) {
3246                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3247                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3248                                           nodemap->nodes[j].pnn, i, 
3249                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3250                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3251                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3252                                             vnnmap);
3253                                 return;
3254                         }
3255                 }
3256
3257                 /* verify the flags are consistent
3258                 */
3259                 for (i=0; i<nodemap->num; i++) {
3260                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3261                                 continue;
3262                         }
3263                         
3264                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3265                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3266                                   nodemap->nodes[j].pnn, 
3267                                   nodemap->nodes[i].pnn, 
3268                                   remote_nodemaps[j]->nodes[i].flags,
3269                                   nodemap->nodes[j].flags));
3270                                 if (i == j) {
3271                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3272                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3273                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3274                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3275                                                     vnnmap);
3276                                         return;
3277                                 } else {
3278                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3279                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3280                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3281                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3282                                                     vnnmap);
3283                                         return;
3284                                 }
3285                         }
3286                 }
3287         }
3288
3289
3290         /* there better be the same number of lmasters in the vnn map
3291            as there are active nodes or we will have to do a recovery
3292          */
3293         if (vnnmap->size != rec->num_active) {
3294                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3295                           vnnmap->size, rec->num_active));
3296                 ctdb_set_culprit(rec, ctdb->pnn);
3297                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3298                 return;
3299         }
3300
3301         /* verify that all active nodes in the nodemap also exist in 
3302            the vnnmap.
3303          */
3304         for (j=0; j<nodemap->num; j++) {
3305                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3306                         continue;
3307                 }
3308                 if (nodemap->nodes[j].pnn == pnn) {
3309                         continue;
3310                 }
3311
3312                 for (i=0; i<vnnmap->size; i++) {
3313                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3314                                 break;
3315                         }
3316                 }
3317                 if (i == vnnmap->size) {
3318                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3319                                   nodemap->nodes[j].pnn));
3320                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3321                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3322                         return;
3323                 }
3324         }
3325
3326         
3327         /* verify that all other nodes have the same vnnmap
3328            and are from the same generation
3329          */
3330         for (j=0; j<nodemap->num; j++) {
3331                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3332                         continue;
3333                 }
3334                 if (nodemap->nodes[j].pnn == pnn) {
3335                         continue;
3336                 }
3337
3338                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3339                                           mem_ctx, &remote_vnnmap);
3340                 if (ret != 0) {
3341                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3342                                   nodemap->nodes[j].pnn));
3343                         return;
3344                 }
3345
3346                 /* verify the vnnmap generation is the same */
3347                 if (vnnmap->generation != remote_vnnmap->generation) {
3348                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3349                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3350                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3351                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3352                         return;
3353                 }
3354
3355                 /* verify the vnnmap size is the same */
3356                 if (vnnmap->size != remote_vnnmap->size) {
3357                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3358                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3359                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3360                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3361                         return;
3362                 }
3363
3364                 /* verify the vnnmap is the same */
3365                 for (i=0;i<vnnmap->size;i++) {
3366                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3367                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3368                                           nodemap->nodes[j].pnn));
3369                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3370                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3371                                             vnnmap);
3372                                 return;
3373                         }
3374                 }
3375         }
3376
3377         /* we might need to change who has what IP assigned */
3378         if (rec->need_takeover_run) {
3379                 uint32_t culprit = (uint32_t)-1;
3380
3381                 rec->need_takeover_run = false;
3382
3383                 /* update the list of public ips that a node can handle for
3384                    all connected nodes
3385                 */
3386                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3387                 if (ret != 0) {
3388                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3389                                          culprit));
3390                         ctdb_set_culprit(rec, culprit);
3391                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3392                         return;
3393                 }
3394
3395                 /* execute the "startrecovery" event script on all nodes */
3396                 ret = run_startrecovery_eventscript(rec, nodemap);
3397                 if (ret!=0) {
3398                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3399                         ctdb_set_culprit(rec, ctdb->pnn);
3400                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3401                         return;
3402                 }
3403
3404                 ret = ctdb_takeover_run(ctdb, nodemap);
3405                 if (ret != 0) {
3406                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3407                         ctdb_set_culprit(rec, ctdb->pnn);
3408                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3409                         return;
3410                 }
3411
3412                 /* execute the "recovered" event script on all nodes */
3413                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3414 #if 0
3415 // we cant check whether the event completed successfully
3416 // since this script WILL fail if the node is in recovery mode
3417 // and if that race happens, the code here would just cause a second
3418 // cascading recovery.
3419                 if (ret!=0) {
3420                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3421                         ctdb_set_culprit(rec, ctdb->pnn);
3422                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3423                 }
3424 #endif
3425         }
3426 }
3427
3428 /*
3429   the main monitoring loop
3430  */
3431 static void monitor_cluster(struct ctdb_context *ctdb)
3432 {
3433         struct ctdb_recoverd *rec;
3434
3435         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3436
3437         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3438         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3439
3440         rec->ctdb = ctdb;
3441
3442         rec->priority_time = timeval_current();
3443
3444         /* register a message port for sending memory dumps */
3445         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3446
3447         /* register a message port for recovery elections */
3448         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3449
3450         /* when nodes are disabled/enabled */
3451         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3452
3453         /* when we are asked to puch out a flag change */
3454         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3455
3456         /* register a message port for vacuum fetch */
3457         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3458
3459         /* register a message port for reloadnodes  */
3460         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3461
3462         /* register a message port for performing a takeover run */
3463         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3464
3465         /* register a message port for disabling the ip check for a short while */
3466         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3467
3468         /* register a message port for updating the recovery daemons node assignment for an ip */
3469         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3470
3471         for (;;) {
3472                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3473                 struct timeval start;
3474                 double elapsed;
3475
3476                 if (!mem_ctx) {
3477                         DEBUG(DEBUG_CRIT,(__location__
3478                                           " Failed to create temp context\n"));
3479                         exit(-1);
3480                 }
3481
3482                 start = timeval_current();
3483                 main_loop(ctdb, rec, mem_ctx);
3484                 talloc_free(mem_ctx);
3485
3486                 /* we only check for recovery once every second */
3487                 elapsed = timeval_elapsed(&start);
3488                 if (elapsed < ctdb->tunable.recover_interval) {
3489                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3490                                           - elapsed);
3491                 }
3492         }
3493 }
3494
3495 /*
3496   event handler for when the main ctdbd dies
3497  */
3498 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3499                                  uint16_t flags, void *private_data)
3500 {
3501         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3502         _exit(1);
3503 }
3504
3505 /*
3506   called regularly to verify that the recovery daemon is still running
3507  */
3508 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3509                               struct timeval yt, void *p)
3510 {
3511         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3512
3513         if (kill(ctdb->recoverd_pid, 0) != 0) {
3514                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3515
3516                 ctdb_stop_recoverd(ctdb);
3517                 ctdb_stop_keepalive(ctdb);
3518                 ctdb_stop_monitoring(ctdb);
3519                 ctdb_release_all_ips(ctdb);
3520                 if (ctdb->methods != NULL) {
3521                         ctdb->methods->shutdown(ctdb);
3522                 }
3523                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3524
3525                 exit(10);       
3526         }
3527
3528         event_add_timed(ctdb->ev, ctdb, 
3529                         timeval_current_ofs(30, 0),
3530                         ctdb_check_recd, ctdb);
3531 }
3532
3533 static void recd_sig_child_handler(struct event_context *ev,
3534         struct signal_event *se, int signum, int count,
3535         void *dont_care, 
3536         void *private_data)
3537 {
3538 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3539         int status;
3540         pid_t pid = -1;
3541
3542         while (pid != 0) {
3543                 pid = waitpid(-1, &status, WNOHANG);
3544                 if (pid == -1) {
3545                         if (errno != ECHILD) {
3546                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3547                         }
3548                         return;
3549                 }
3550                 if (pid > 0) {
3551                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3552                 }
3553         }
3554 }
3555
3556 /*
3557   startup the recovery daemon as a child of the main ctdb daemon
3558  */
3559 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3560 {
3561         int fd[2];
3562         struct signal_event *se;
3563         struct tevent_fd *fde;
3564
3565         if (pipe(fd) != 0) {
3566                 return -1;
3567         }
3568
3569         ctdb->ctdbd_pid = getpid();
3570
3571         ctdb->recoverd_pid = fork();
3572         if (ctdb->recoverd_pid == -1) {
3573                 return -1;
3574         }
3575         
3576         if (ctdb->recoverd_pid != 0) {
3577                 close(fd[0]);
3578                 event_add_timed(ctdb->ev, ctdb, 
3579                                 timeval_current_ofs(30, 0),
3580                                 ctdb_check_recd, ctdb);
3581                 return 0;
3582         }
3583
3584         close(fd[1]);
3585
3586         srandom(getpid() ^ time(NULL));
3587
3588         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3589                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3590                 exit(1);
3591         }
3592
3593         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3594
3595         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3596                      ctdb_recoverd_parent, &fd[0]);     
3597         tevent_fd_set_auto_close(fde);
3598
3599         /* set up a handler to pick up sigchld */
3600         se = event_add_signal(ctdb->ev, ctdb,
3601                                      SIGCHLD, 0,
3602                                      recd_sig_child_handler,
3603                                      ctdb);
3604         if (se == NULL) {
3605                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3606                 exit(1);
3607         }
3608
3609         monitor_cluster(ctdb);
3610
3611         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3612         return -1;
3613 }
3614
3615 /*
3616   shutdown the recovery daemon
3617  */
3618 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3619 {
3620         if (ctdb->recoverd_pid == 0) {
3621                 return;
3622         }
3623
3624         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3625         kill(ctdb->recoverd_pid, SIGTERM);
3626 }