Merge branch 'master-readonly-records' into foo
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
74
75 /*
76   ban a node for a period of time
77  */
78 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
79 {
80         int ret;
81         struct ctdb_context *ctdb = rec->ctdb;
82         struct ctdb_ban_time bantime;
83        
84         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
85
86         if (!ctdb_validate_pnn(ctdb, pnn)) {
87                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
88                 return;
89         }
90
91         bantime.pnn  = pnn;
92         bantime.time = ban_time;
93
94         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
97                 return;
98         }
99
100 }
101
102 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
103
104
105 /*
106   run the "recovered" eventscript on all nodes
107  */
108 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
109 {
110         TALLOC_CTX *tmp_ctx;
111         uint32_t *nodes;
112
113         tmp_ctx = talloc_new(ctdb);
114         CTDB_NO_MEMORY(ctdb, tmp_ctx);
115
116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
118                                         nodes, 0,
119                                         CONTROL_TIMEOUT(), false, tdb_null,
120                                         NULL, NULL,
121                                         NULL) != 0) {
122                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
123
124                 talloc_free(tmp_ctx);
125                 return -1;
126         }
127
128         talloc_free(tmp_ctx);
129         return 0;
130 }
131
132 /*
133   remember the trouble maker
134  */
135 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
138         struct ctdb_banning_state *ban_state;
139
140         if (culprit > ctdb->num_nodes) {
141                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
142                 return;
143         }
144
145         if (ctdb->nodes[culprit]->ban_state == NULL) {
146                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
147                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
148
149                 
150         }
151         ban_state = ctdb->nodes[culprit]->ban_state;
152         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
153                 /* this was the first time in a long while this node
154                    misbehaved so we will forgive any old transgressions.
155                 */
156                 ban_state->count = 0;
157         }
158
159         ban_state->count += count;
160         ban_state->last_reported_time = timeval_current();
161         rec->last_culprit_node = culprit;
162 }
163
164 /*
165   remember the trouble maker
166  */
167 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
168 {
169         ctdb_set_culprit_count(rec, culprit, 1);
170 }
171
172
173 /* this callback is called for every node that failed to execute the
174    start recovery event
175 */
176 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
177 {
178         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
179
180         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
181
182         ctdb_set_culprit(rec, node_pnn);
183 }
184
185 /*
186   run the "startrecovery" eventscript on all nodes
187  */
188 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
189 {
190         TALLOC_CTX *tmp_ctx;
191         uint32_t *nodes;
192         struct ctdb_context *ctdb = rec->ctdb;
193
194         tmp_ctx = talloc_new(ctdb);
195         CTDB_NO_MEMORY(ctdb, tmp_ctx);
196
197         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
199                                         nodes, 0,
200                                         CONTROL_TIMEOUT(), false, tdb_null,
201                                         NULL,
202                                         startrecovery_fail_callback,
203                                         rec) != 0) {
204                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
205                 talloc_free(tmp_ctx);
206                 return -1;
207         }
208
209         talloc_free(tmp_ctx);
210         return 0;
211 }
212
213 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
214 {
215         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
216                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
217                 return;
218         }
219         if (node_pnn < ctdb->num_nodes) {
220                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
221         }
222 }
223
224 /*
225   update the node capabilities for all connected nodes
226  */
227 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
228 {
229         uint32_t *nodes;
230         TALLOC_CTX *tmp_ctx;
231
232         tmp_ctx = talloc_new(ctdb);
233         CTDB_NO_MEMORY(ctdb, tmp_ctx);
234
235         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
236         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
237                                         nodes, 0,
238                                         CONTROL_TIMEOUT(),
239                                         false, tdb_null,
240                                         async_getcap_callback, NULL,
241                                         NULL) != 0) {
242                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
243                 talloc_free(tmp_ctx);
244                 return -1;
245         }
246
247         talloc_free(tmp_ctx);
248         return 0;
249 }
250
251 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
252 {
253         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
254
255         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
256         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
257 }
258
259 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
260 {
261         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
262
263         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
264         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
265 }
266
267 /*
268   change recovery mode on all nodes
269  */
270 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
271 {
272         TDB_DATA data;
273         uint32_t *nodes;
274         TALLOC_CTX *tmp_ctx;
275
276         tmp_ctx = talloc_new(ctdb);
277         CTDB_NO_MEMORY(ctdb, tmp_ctx);
278
279         /* freeze all nodes */
280         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
281         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
282                 int i;
283
284                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
285                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
286                                                 nodes, i,
287                                                 CONTROL_TIMEOUT(),
288                                                 false, tdb_null,
289                                                 NULL,
290                                                 set_recmode_fail_callback,
291                                                 rec) != 0) {
292                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
293                                 talloc_free(tmp_ctx);
294                                 return -1;
295                         }
296                 }
297         }
298
299
300         data.dsize = sizeof(uint32_t);
301         data.dptr = (unsigned char *)&rec_mode;
302
303         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
304                                         nodes, 0,
305                                         CONTROL_TIMEOUT(),
306                                         false, data,
307                                         NULL, NULL,
308                                         NULL) != 0) {
309                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
310                 talloc_free(tmp_ctx);
311                 return -1;
312         }
313
314         talloc_free(tmp_ctx);
315         return 0;
316 }
317
318 /*
319   change recovery master on all node
320  */
321 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
322 {
323         TDB_DATA data;
324         TALLOC_CTX *tmp_ctx;
325         uint32_t *nodes;
326
327         tmp_ctx = talloc_new(ctdb);
328         CTDB_NO_MEMORY(ctdb, tmp_ctx);
329
330         data.dsize = sizeof(uint32_t);
331         data.dptr = (unsigned char *)&pnn;
332
333         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
334         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
335                                         nodes, 0,
336                                         CONTROL_TIMEOUT(), false, data,
337                                         NULL, NULL,
338                                         NULL) != 0) {
339                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
340                 talloc_free(tmp_ctx);
341                 return -1;
342         }
343
344         talloc_free(tmp_ctx);
345         return 0;
346 }
347
348 /* update all remote nodes to use the same db priority that we have
349    this can fail if the remove node has not yet been upgraded to 
350    support this function, so we always return success and never fail
351    a recovery if this call fails.
352 */
353 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
354         struct ctdb_node_map *nodemap, 
355         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
356 {
357         int db;
358         uint32_t *nodes;
359
360         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
361
362         /* step through all local databases */
363         for (db=0; db<dbmap->num;db++) {
364                 TDB_DATA data;
365                 struct ctdb_db_priority db_prio;
366                 int ret;
367
368                 db_prio.db_id     = dbmap->dbs[db].dbid;
369                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
370                 if (ret != 0) {
371                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
372                         continue;
373                 }
374
375                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
376
377                 data.dptr  = (uint8_t *)&db_prio;
378                 data.dsize = sizeof(db_prio);
379
380                 if (ctdb_client_async_control(ctdb,
381                                         CTDB_CONTROL_SET_DB_PRIORITY,
382                                         nodes, 0,
383                                         CONTROL_TIMEOUT(), false, data,
384                                         NULL, NULL,
385                                         NULL) != 0) {
386                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
387                 }
388         }
389
390         return 0;
391 }                       
392
393 /*
394   ensure all other nodes have attached to any databases that we have
395  */
396 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
397                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
398 {
399         int i, j, db, ret;
400         struct ctdb_dbid_map *remote_dbmap;
401
402         /* verify that all other nodes have all our databases */
403         for (j=0; j<nodemap->num; j++) {
404                 /* we dont need to ourself ourselves */
405                 if (nodemap->nodes[j].pnn == pnn) {
406                         continue;
407                 }
408                 /* dont check nodes that are unavailable */
409                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
410                         continue;
411                 }
412
413                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
414                                          mem_ctx, &remote_dbmap);
415                 if (ret != 0) {
416                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
417                         return -1;
418                 }
419
420                 /* step through all local databases */
421                 for (db=0; db<dbmap->num;db++) {
422                         const char *name;
423
424
425                         for (i=0;i<remote_dbmap->num;i++) {
426                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
427                                         break;
428                                 }
429                         }
430                         /* the remote node already have this database */
431                         if (i!=remote_dbmap->num) {
432                                 continue;
433                         }
434                         /* ok so we need to create this database */
435                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
436                                             mem_ctx, &name);
437                         if (ret != 0) {
438                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
439                                 return -1;
440                         }
441                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                            mem_ctx, name,
443                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
444                         if (ret != 0) {
445                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
446                                 return -1;
447                         }
448                 }
449         }
450
451         return 0;
452 }
453
454
455 /*
456   ensure we are attached to any databases that anyone else is attached to
457  */
458 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
459                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
460 {
461         int i, j, db, ret;
462         struct ctdb_dbid_map *remote_dbmap;
463
464         /* verify that we have all database any other node has */
465         for (j=0; j<nodemap->num; j++) {
466                 /* we dont need to ourself ourselves */
467                 if (nodemap->nodes[j].pnn == pnn) {
468                         continue;
469                 }
470                 /* dont check nodes that are unavailable */
471                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
472                         continue;
473                 }
474
475                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
476                                          mem_ctx, &remote_dbmap);
477                 if (ret != 0) {
478                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
479                         return -1;
480                 }
481
482                 /* step through all databases on the remote node */
483                 for (db=0; db<remote_dbmap->num;db++) {
484                         const char *name;
485
486                         for (i=0;i<(*dbmap)->num;i++) {
487                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
488                                         break;
489                                 }
490                         }
491                         /* we already have this db locally */
492                         if (i!=(*dbmap)->num) {
493                                 continue;
494                         }
495                         /* ok so we need to create this database and
496                            rebuild dbmap
497                          */
498                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
499                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
500                         if (ret != 0) {
501                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
502                                           nodemap->nodes[j].pnn));
503                                 return -1;
504                         }
505                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
506                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
507                         if (ret != 0) {
508                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
509                                 return -1;
510                         }
511                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
512                         if (ret != 0) {
513                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
514                                 return -1;
515                         }
516                 }
517         }
518
519         return 0;
520 }
521
522
523 /*
524   pull the remote database contents from one node into the recdb
525  */
526 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
527                                     struct tdb_wrap *recdb, uint32_t dbid,
528                                     bool persistent)
529 {
530         int ret;
531         TDB_DATA outdata;
532         struct ctdb_marshall_buffer *reply;
533         struct ctdb_rec_data *rec;
534         int i;
535         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
536
537         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
538                                CONTROL_TIMEOUT(), &outdata);
539         if (ret != 0) {
540                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
541                 talloc_free(tmp_ctx);
542                 return -1;
543         }
544
545         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
546
547         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
548                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
549                 talloc_free(tmp_ctx);
550                 return -1;
551         }
552         
553         rec = (struct ctdb_rec_data *)&reply->data[0];
554         
555         for (i=0;
556              i<reply->count;
557              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
558                 TDB_DATA key, data;
559                 struct ctdb_ltdb_header *hdr;
560                 TDB_DATA existing;
561                 
562                 key.dptr = &rec->data[0];
563                 key.dsize = rec->keylen;
564                 data.dptr = &rec->data[key.dsize];
565                 data.dsize = rec->datalen;
566                 
567                 hdr = (struct ctdb_ltdb_header *)data.dptr;
568
569                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
570                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
571                         talloc_free(tmp_ctx);
572                         return -1;
573                 }
574
575                 /* fetch the existing record, if any */
576                 existing = tdb_fetch(recdb->tdb, key);
577                 
578                 if (existing.dptr != NULL) {
579                         struct ctdb_ltdb_header header;
580                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
581                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
582                                          (unsigned)existing.dsize, srcnode));
583                                 free(existing.dptr);
584                                 talloc_free(tmp_ctx);
585                                 return -1;
586                         }
587                         header = *(struct ctdb_ltdb_header *)existing.dptr;
588                         free(existing.dptr);
589                         if (!(header.rsn < hdr->rsn ||
590                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
591                                 continue;
592                         }
593                 }
594                 
595                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
596                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
597                         talloc_free(tmp_ctx);
598                         return -1;                              
599                 }
600         }
601
602         talloc_free(tmp_ctx);
603
604         return 0;
605 }
606
607 /*
608   pull all the remote database contents into the recdb
609  */
610 static int pull_remote_database(struct ctdb_context *ctdb,
611                                 struct ctdb_recoverd *rec, 
612                                 struct ctdb_node_map *nodemap, 
613                                 struct tdb_wrap *recdb, uint32_t dbid,
614                                 bool persistent)
615 {
616         int j;
617
618         /* pull all records from all other nodes across onto this node
619            (this merges based on rsn)
620         */
621         for (j=0; j<nodemap->num; j++) {
622                 /* dont merge from nodes that are unavailable */
623                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
624                         continue;
625                 }
626                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
627                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
628                                  nodemap->nodes[j].pnn));
629                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
630                         return -1;
631                 }
632         }
633         
634         return 0;
635 }
636
637
638 /*
639   update flags on all active nodes
640  */
641 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
642 {
643         int ret;
644
645         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
646                 if (ret != 0) {
647                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
648                 return -1;
649         }
650
651         return 0;
652 }
653
654 /*
655   ensure all nodes have the same vnnmap we do
656  */
657 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
658                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
659 {
660         int j, ret;
661
662         /* push the new vnn map out to all the nodes */
663         for (j=0; j<nodemap->num; j++) {
664                 /* dont push to nodes that are unavailable */
665                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
666                         continue;
667                 }
668
669                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
670                 if (ret != 0) {
671                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
672                         return -1;
673                 }
674         }
675
676         return 0;
677 }
678
679
680 struct vacuum_info {
681         struct vacuum_info *next, *prev;
682         struct ctdb_recoverd *rec;
683         uint32_t srcnode;
684         struct ctdb_db_context *ctdb_db;
685         struct ctdb_marshall_buffer *recs;
686         struct ctdb_rec_data *r;
687 };
688
689 static void vacuum_fetch_next(struct vacuum_info *v);
690
691 /*
692   called when a vacuum fetch has completed - just free it and do the next one
693  */
694 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
695 {
696         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
697         talloc_free(state);
698         vacuum_fetch_next(v);
699 }
700
701
702 /*
703   process the next element from the vacuum list
704 */
705 static void vacuum_fetch_next(struct vacuum_info *v)
706 {
707         struct ctdb_call call;
708         struct ctdb_rec_data *r;
709
710         while (v->recs->count) {
711                 struct ctdb_client_call_state *state;
712                 TDB_DATA data;
713                 struct ctdb_ltdb_header *hdr;
714
715                 ZERO_STRUCT(call);
716                 call.call_id = CTDB_NULL_FUNC;
717                 call.flags = CTDB_IMMEDIATE_MIGRATION;
718                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
719
720                 r = v->r;
721                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
722                 v->recs->count--;
723
724                 call.key.dptr = &r->data[0];
725                 call.key.dsize = r->keylen;
726
727                 /* ensure we don't block this daemon - just skip a record if we can't get
728                    the chainlock */
729                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
730                         continue;
731                 }
732
733                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
734                 if (data.dptr == NULL) {
735                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
736                         continue;
737                 }
738
739                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
740                         free(data.dptr);
741                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
742                         continue;
743                 }
744                 
745                 hdr = (struct ctdb_ltdb_header *)data.dptr;
746                 if (hdr->dmaster == v->rec->ctdb->pnn) {
747                         /* its already local */
748                         free(data.dptr);
749                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
750                         continue;
751                 }
752
753                 free(data.dptr);
754
755                 state = ctdb_call_send(v->ctdb_db, &call);
756                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
757                 if (state == NULL) {
758                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
759                         talloc_free(v);
760                         return;
761                 }
762                 state->async.fn = vacuum_fetch_callback;
763                 state->async.private_data = v;
764                 return;
765         }
766
767         talloc_free(v);
768 }
769
770
771 /*
772   destroy a vacuum info structure
773  */
774 static int vacuum_info_destructor(struct vacuum_info *v)
775 {
776         DLIST_REMOVE(v->rec->vacuum_info, v);
777         return 0;
778 }
779
780
781 /*
782   handler for vacuum fetch
783 */
784 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
785                                  TDB_DATA data, void *private_data)
786 {
787         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
788         struct ctdb_marshall_buffer *recs;
789         int ret, i;
790         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
791         const char *name;
792         struct ctdb_dbid_map *dbmap=NULL;
793         bool persistent = false;
794         struct ctdb_db_context *ctdb_db;
795         struct ctdb_rec_data *r;
796         uint32_t srcnode;
797         struct vacuum_info *v;
798
799         recs = (struct ctdb_marshall_buffer *)data.dptr;
800         r = (struct ctdb_rec_data *)&recs->data[0];
801
802         if (recs->count == 0) {
803                 talloc_free(tmp_ctx);
804                 return;
805         }
806
807         srcnode = r->reqid;
808
809         for (v=rec->vacuum_info;v;v=v->next) {
810                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
811                         /* we're already working on records from this node */
812                         talloc_free(tmp_ctx);
813                         return;
814                 }
815         }
816
817         /* work out if the database is persistent */
818         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
819         if (ret != 0) {
820                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
821                 talloc_free(tmp_ctx);
822                 return;
823         }
824
825         for (i=0;i<dbmap->num;i++) {
826                 if (dbmap->dbs[i].dbid == recs->db_id) {
827                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
828                         break;
829                 }
830         }
831         if (i == dbmap->num) {
832                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
833                 talloc_free(tmp_ctx);
834                 return;         
835         }
836
837         /* find the name of this database */
838         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
839                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
840                 talloc_free(tmp_ctx);
841                 return;
842         }
843
844         /* attach to it */
845         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
846         if (ctdb_db == NULL) {
847                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
848                 talloc_free(tmp_ctx);
849                 return;
850         }
851
852         v = talloc_zero(rec, struct vacuum_info);
853         if (v == NULL) {
854                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
855                 talloc_free(tmp_ctx);
856                 return;
857         }
858
859         v->rec = rec;
860         v->srcnode = srcnode;
861         v->ctdb_db = ctdb_db;
862         v->recs = talloc_memdup(v, recs, data.dsize);
863         if (v->recs == NULL) {
864                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
865                 talloc_free(v);
866                 talloc_free(tmp_ctx);
867                 return;         
868         }
869         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
870
871         DLIST_ADD(rec->vacuum_info, v);
872
873         talloc_set_destructor(v, vacuum_info_destructor);
874
875         vacuum_fetch_next(v);
876         talloc_free(tmp_ctx);
877 }
878
879
880 /*
881   called when ctdb_wait_timeout should finish
882  */
883 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
884                               struct timeval yt, void *p)
885 {
886         uint32_t *timed_out = (uint32_t *)p;
887         (*timed_out) = 1;
888 }
889
890 /*
891   wait for a given number of seconds
892  */
893 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
894 {
895         uint32_t timed_out = 0;
896         time_t usecs = (secs - (time_t)secs) * 1000000;
897         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
898         while (!timed_out) {
899                 event_loop_once(ctdb->ev);
900         }
901 }
902
903 /*
904   called when an election times out (ends)
905  */
906 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
907                                   struct timeval t, void *p)
908 {
909         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
910         rec->election_timeout = NULL;
911         fast_start = false;
912
913         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
914 }
915
916
917 /*
918   wait for an election to finish. It finished election_timeout seconds after
919   the last election packet is received
920  */
921 static void ctdb_wait_election(struct ctdb_recoverd *rec)
922 {
923         struct ctdb_context *ctdb = rec->ctdb;
924         while (rec->election_timeout) {
925                 event_loop_once(ctdb->ev);
926         }
927 }
928
929 /*
930   Update our local flags from all remote connected nodes. 
931   This is only run when we are or we belive we are the recovery master
932  */
933 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
934 {
935         int j;
936         struct ctdb_context *ctdb = rec->ctdb;
937         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
938
939         /* get the nodemap for all active remote nodes and verify
940            they are the same as for this node
941          */
942         for (j=0; j<nodemap->num; j++) {
943                 struct ctdb_node_map *remote_nodemap=NULL;
944                 int ret;
945
946                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
947                         continue;
948                 }
949                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
950                         continue;
951                 }
952
953                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
954                                            mem_ctx, &remote_nodemap);
955                 if (ret != 0) {
956                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
957                                   nodemap->nodes[j].pnn));
958                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
959                         talloc_free(mem_ctx);
960                         return MONITOR_FAILED;
961                 }
962                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
963                         /* We should tell our daemon about this so it
964                            updates its flags or else we will log the same 
965                            message again in the next iteration of recovery.
966                            Since we are the recovery master we can just as
967                            well update the flags on all nodes.
968                         */
969                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
970                         if (ret != 0) {
971                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
972                                 return -1;
973                         }
974
975                         /* Update our local copy of the flags in the recovery
976                            daemon.
977                         */
978                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
979                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
980                                  nodemap->nodes[j].flags));
981                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
982                 }
983                 talloc_free(remote_nodemap);
984         }
985         talloc_free(mem_ctx);
986         return MONITOR_OK;
987 }
988
989
990 /* Create a new random generation ip. 
991    The generation id can not be the INVALID_GENERATION id
992 */
993 static uint32_t new_generation(void)
994 {
995         uint32_t generation;
996
997         while (1) {
998                 generation = random();
999
1000                 if (generation != INVALID_GENERATION) {
1001                         break;
1002                 }
1003         }
1004
1005         return generation;
1006 }
1007
1008
1009 /*
1010   create a temporary working database
1011  */
1012 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1013 {
1014         char *name;
1015         struct tdb_wrap *recdb;
1016         unsigned tdb_flags;
1017
1018         /* open up the temporary recovery database */
1019         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1020                                ctdb->db_directory_state,
1021                                ctdb->pnn);
1022         if (name == NULL) {
1023                 return NULL;
1024         }
1025         unlink(name);
1026
1027         tdb_flags = TDB_NOLOCK;
1028         if (ctdb->valgrinding) {
1029                 tdb_flags |= TDB_NOMMAP;
1030         }
1031         tdb_flags |= TDB_DISALLOW_NESTING;
1032
1033         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1034                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1035         if (recdb == NULL) {
1036                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1037         }
1038
1039         talloc_free(name);
1040
1041         return recdb;
1042 }
1043
1044
1045 /* 
1046    a traverse function for pulling all relevent records from recdb
1047  */
1048 struct recdb_data {
1049         struct ctdb_context *ctdb;
1050         struct ctdb_marshall_buffer *recdata;
1051         uint32_t len;
1052         bool failed;
1053         bool persistent;
1054 };
1055
1056 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1057 {
1058         struct recdb_data *params = (struct recdb_data *)p;
1059         struct ctdb_rec_data *rec;
1060         struct ctdb_ltdb_header *hdr;
1061
1062         /* skip empty records */
1063         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1064                 return 0;
1065         }
1066
1067         /* update the dmaster field to point to us */
1068         hdr = (struct ctdb_ltdb_header *)data.dptr;
1069         if (!params->persistent) {
1070                 hdr->dmaster = params->ctdb->pnn;
1071                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1072         }
1073
1074         /* add the record to the blob ready to send to the nodes */
1075         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1076         if (rec == NULL) {
1077                 params->failed = true;
1078                 return -1;
1079         }
1080         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1081         if (params->recdata == NULL) {
1082                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1083                          rec->length + params->len, params->recdata->count));
1084                 params->failed = true;
1085                 return -1;
1086         }
1087         params->recdata->count++;
1088         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1089         params->len += rec->length;
1090         talloc_free(rec);
1091
1092         return 0;
1093 }
1094
1095 /*
1096   push the recdb database out to all nodes
1097  */
1098 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1099                                bool persistent,
1100                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1101 {
1102         struct recdb_data params;
1103         struct ctdb_marshall_buffer *recdata;
1104         TDB_DATA outdata;
1105         TALLOC_CTX *tmp_ctx;
1106         uint32_t *nodes;
1107
1108         tmp_ctx = talloc_new(ctdb);
1109         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1110
1111         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1112         CTDB_NO_MEMORY(ctdb, recdata);
1113
1114         recdata->db_id = dbid;
1115
1116         params.ctdb = ctdb;
1117         params.recdata = recdata;
1118         params.len = offsetof(struct ctdb_marshall_buffer, data);
1119         params.failed = false;
1120         params.persistent = persistent;
1121
1122         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1123                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1124                 talloc_free(params.recdata);
1125                 talloc_free(tmp_ctx);
1126                 return -1;
1127         }
1128
1129         if (params.failed) {
1130                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1131                 talloc_free(params.recdata);
1132                 talloc_free(tmp_ctx);
1133                 return -1;              
1134         }
1135
1136         recdata = params.recdata;
1137
1138         outdata.dptr = (void *)recdata;
1139         outdata.dsize = params.len;
1140
1141         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1142         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1143                                         nodes, 0,
1144                                         CONTROL_TIMEOUT(), false, outdata,
1145                                         NULL, NULL,
1146                                         NULL) != 0) {
1147                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1148                 talloc_free(recdata);
1149                 talloc_free(tmp_ctx);
1150                 return -1;
1151         }
1152
1153         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1154                   dbid, recdata->count));
1155
1156         talloc_free(recdata);
1157         talloc_free(tmp_ctx);
1158
1159         return 0;
1160 }
1161
1162
1163 /*
1164   go through a full recovery on one database 
1165  */
1166 static int recover_database(struct ctdb_recoverd *rec, 
1167                             TALLOC_CTX *mem_ctx,
1168                             uint32_t dbid,
1169                             bool persistent,
1170                             uint32_t pnn, 
1171                             struct ctdb_node_map *nodemap,
1172                             uint32_t transaction_id)
1173 {
1174         struct tdb_wrap *recdb;
1175         int ret;
1176         struct ctdb_context *ctdb = rec->ctdb;
1177         TDB_DATA data;
1178         struct ctdb_control_wipe_database w;
1179         uint32_t *nodes;
1180
1181         recdb = create_recdb(ctdb, mem_ctx);
1182         if (recdb == NULL) {
1183                 return -1;
1184         }
1185
1186         /* pull all remote databases onto the recdb */
1187         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1188         if (ret != 0) {
1189                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1190                 return -1;
1191         }
1192
1193         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1194
1195         /* wipe all the remote databases. This is safe as we are in a transaction */
1196         w.db_id = dbid;
1197         w.transaction_id = transaction_id;
1198
1199         data.dptr = (void *)&w;
1200         data.dsize = sizeof(w);
1201
1202         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1204                                         nodes, 0,
1205                                         CONTROL_TIMEOUT(), false, data,
1206                                         NULL, NULL,
1207                                         NULL) != 0) {
1208                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1209                 talloc_free(recdb);
1210                 return -1;
1211         }
1212         
1213         /* push out the correct database. This sets the dmaster and skips 
1214            the empty records */
1215         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1216         if (ret != 0) {
1217                 talloc_free(recdb);
1218                 return -1;
1219         }
1220
1221         /* all done with this database */
1222         talloc_free(recdb);
1223
1224         return 0;
1225 }
1226
1227 /*
1228   reload the nodes file 
1229 */
1230 static void reload_nodes_file(struct ctdb_context *ctdb)
1231 {
1232         ctdb->nodes = NULL;
1233         ctdb_load_nodes_file(ctdb);
1234 }
1235
1236 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1237                                          struct ctdb_recoverd *rec,
1238                                          struct ctdb_node_map *nodemap,
1239                                          uint32_t *culprit)
1240 {
1241         int j;
1242         int ret;
1243
1244         if (ctdb->num_nodes != nodemap->num) {
1245                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1246                                   ctdb->num_nodes, nodemap->num));
1247                 if (culprit) {
1248                         *culprit = ctdb->pnn;
1249                 }
1250                 return -1;
1251         }
1252
1253         for (j=0; j<nodemap->num; j++) {
1254                 /* release any existing data */
1255                 if (ctdb->nodes[j]->known_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->known_public_ips);
1257                         ctdb->nodes[j]->known_public_ips = NULL;
1258                 }
1259                 if (ctdb->nodes[j]->available_public_ips) {
1260                         talloc_free(ctdb->nodes[j]->available_public_ips);
1261                         ctdb->nodes[j]->available_public_ips = NULL;
1262                 }
1263
1264                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1265                         continue;
1266                 }
1267
1268                 /* grab a new shiny list of public ips from the node */
1269                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1270                                         CONTROL_TIMEOUT(),
1271                                         ctdb->nodes[j]->pnn,
1272                                         ctdb->nodes,
1273                                         0,
1274                                         &ctdb->nodes[j]->known_public_ips);
1275                 if (ret != 0) {
1276                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1277                                 ctdb->nodes[j]->pnn));
1278                         if (culprit) {
1279                                 *culprit = ctdb->nodes[j]->pnn;
1280                         }
1281                         return -1;
1282                 }
1283
1284                 if (ctdb->tunable.disable_ip_failover == 0) {
1285                         if (rec->ip_check_disable_ctx == NULL) {
1286                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1287                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1288                                         rec->need_takeover_run = true;
1289                                 }
1290                         }
1291                 }
1292
1293                 /* grab a new shiny list of public ips from the node */
1294                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1295                                         CONTROL_TIMEOUT(),
1296                                         ctdb->nodes[j]->pnn,
1297                                         ctdb->nodes,
1298                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1299                                         &ctdb->nodes[j]->available_public_ips);
1300                 if (ret != 0) {
1301                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1302                                 ctdb->nodes[j]->pnn));
1303                         if (culprit) {
1304                                 *culprit = ctdb->nodes[j]->pnn;
1305                         }
1306                         return -1;
1307                 }
1308         }
1309
1310         return 0;
1311 }
1312
1313 /* when we start a recovery, make sure all nodes use the same reclock file
1314    setting
1315 */
1316 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1317 {
1318         struct ctdb_context *ctdb = rec->ctdb;
1319         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1320         TDB_DATA data;
1321         uint32_t *nodes;
1322
1323         if (ctdb->recovery_lock_file == NULL) {
1324                 data.dptr  = NULL;
1325                 data.dsize = 0;
1326         } else {
1327                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1328                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1329         }
1330
1331         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1333                                         nodes, 0,
1334                                         CONTROL_TIMEOUT(),
1335                                         false, data,
1336                                         NULL, NULL,
1337                                         rec) != 0) {
1338                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1339                 talloc_free(tmp_ctx);
1340                 return -1;
1341         }
1342
1343         talloc_free(tmp_ctx);
1344         return 0;
1345 }
1346
1347
1348 /*
1349   we are the recmaster, and recovery is needed - start a recovery run
1350  */
1351 static int do_recovery(struct ctdb_recoverd *rec, 
1352                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1353                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1354 {
1355         struct ctdb_context *ctdb = rec->ctdb;
1356         int i, j, ret;
1357         uint32_t generation;
1358         struct ctdb_dbid_map *dbmap;
1359         TDB_DATA data;
1360         uint32_t *nodes;
1361         struct timeval start_time;
1362         uint32_t culprit = (uint32_t)-1;
1363
1364         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1365
1366         /* if recovery fails, force it again */
1367         rec->need_recovery = true;
1368
1369         for (i=0; i<ctdb->num_nodes; i++) {
1370                 struct ctdb_banning_state *ban_state;
1371
1372                 if (ctdb->nodes[i]->ban_state == NULL) {
1373                         continue;
1374                 }
1375                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1376                 if (ban_state->count < 2*ctdb->num_nodes) {
1377                         continue;
1378                 }
1379                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1380                         ctdb->nodes[i]->pnn, ban_state->count,
1381                         ctdb->tunable.recovery_ban_period));
1382                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1383                 ban_state->count = 0;
1384         }
1385
1386
1387         if (ctdb->tunable.verify_recovery_lock != 0) {
1388                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1389                 start_time = timeval_current();
1390                 if (!ctdb_recovery_lock(ctdb, true)) {
1391                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1392                                          "and ban ourself for %u seconds\n",
1393                                          ctdb->tunable.recovery_ban_period));
1394                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1395                         return -1;
1396                 }
1397                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1398                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1399         }
1400
1401         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1402
1403         /* get a list of all databases */
1404         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1407                 return -1;
1408         }
1409
1410         /* we do the db creation before we set the recovery mode, so the freeze happens
1411            on all databases we will be dealing with. */
1412
1413         /* verify that we have all the databases any other node has */
1414         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1415         if (ret != 0) {
1416                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1417                 return -1;
1418         }
1419
1420         /* verify that all other nodes have all our databases */
1421         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1422         if (ret != 0) {
1423                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1424                 return -1;
1425         }
1426         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1427
1428         /* update the database priority for all remote databases */
1429         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1430         if (ret != 0) {
1431                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1432         }
1433         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1434
1435
1436         /* update all other nodes to use the same setting for reclock files
1437            as the local recovery master.
1438         */
1439         sync_recovery_lock_file_across_cluster(rec);
1440
1441         /* set recovery mode to active on all nodes */
1442         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1443         if (ret != 0) {
1444                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1445                 return -1;
1446         }
1447
1448         /* execute the "startrecovery" event script on all nodes */
1449         ret = run_startrecovery_eventscript(rec, nodemap);
1450         if (ret!=0) {
1451                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1452                 return -1;
1453         }
1454
1455         /*
1456           update all nodes to have the same flags that we have
1457          */
1458         for (i=0;i<nodemap->num;i++) {
1459                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1460                         continue;
1461                 }
1462
1463                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1464                 if (ret != 0) {
1465                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1466                         return -1;
1467                 }
1468         }
1469
1470         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1471
1472         /* pick a new generation number */
1473         generation = new_generation();
1474
1475         /* change the vnnmap on this node to use the new generation 
1476            number but not on any other nodes.
1477            this guarantees that if we abort the recovery prematurely
1478            for some reason (a node stops responding?)
1479            that we can just return immediately and we will reenter
1480            recovery shortly again.
1481            I.e. we deliberately leave the cluster with an inconsistent
1482            generation id to allow us to abort recovery at any stage and
1483            just restart it from scratch.
1484          */
1485         vnnmap->generation = generation;
1486         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1487         if (ret != 0) {
1488                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1489                 return -1;
1490         }
1491
1492         data.dptr = (void *)&generation;
1493         data.dsize = sizeof(uint32_t);
1494
1495         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1496         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1497                                         nodes, 0,
1498                                         CONTROL_TIMEOUT(), false, data,
1499                                         NULL,
1500                                         transaction_start_fail_callback,
1501                                         rec) != 0) {
1502                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1503                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1504                                         nodes, 0,
1505                                         CONTROL_TIMEOUT(), false, tdb_null,
1506                                         NULL,
1507                                         NULL,
1508                                         NULL) != 0) {
1509                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1510                 }
1511                 return -1;
1512         }
1513
1514         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1515
1516         for (i=0;i<dbmap->num;i++) {
1517                 ret = recover_database(rec, mem_ctx,
1518                                        dbmap->dbs[i].dbid,
1519                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1520                                        pnn, nodemap, generation);
1521                 if (ret != 0) {
1522                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1523                         return -1;
1524                 }
1525         }
1526
1527         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1528
1529         /* commit all the changes */
1530         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1531                                         nodes, 0,
1532                                         CONTROL_TIMEOUT(), false, data,
1533                                         NULL, NULL,
1534                                         NULL) != 0) {
1535                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1536                 return -1;
1537         }
1538
1539         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1540         
1541
1542         /* update the capabilities for all nodes */
1543         ret = update_capabilities(ctdb, nodemap);
1544         if (ret!=0) {
1545                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1546                 return -1;
1547         }
1548
1549         /* build a new vnn map with all the currently active and
1550            unbanned nodes */
1551         generation = new_generation();
1552         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1553         CTDB_NO_MEMORY(ctdb, vnnmap);
1554         vnnmap->generation = generation;
1555         vnnmap->size = 0;
1556         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1557         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1558         for (i=j=0;i<nodemap->num;i++) {
1559                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1560                         continue;
1561                 }
1562                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1563                         /* this node can not be an lmaster */
1564                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1565                         continue;
1566                 }
1567
1568                 vnnmap->size++;
1569                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1570                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1571                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1572
1573         }
1574         if (vnnmap->size == 0) {
1575                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1576                 vnnmap->size++;
1577                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1578                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1579                 vnnmap->map[0] = pnn;
1580         }       
1581
1582         /* update to the new vnnmap on all nodes */
1583         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1584         if (ret != 0) {
1585                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1586                 return -1;
1587         }
1588
1589         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1590
1591         /* update recmaster to point to us for all nodes */
1592         ret = set_recovery_master(ctdb, nodemap, pnn);
1593         if (ret!=0) {
1594                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1595                 return -1;
1596         }
1597
1598         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1599
1600         /*
1601           update all nodes to have the same flags that we have
1602          */
1603         for (i=0;i<nodemap->num;i++) {
1604                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1605                         continue;
1606                 }
1607
1608                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1609                 if (ret != 0) {
1610                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1611                         return -1;
1612                 }
1613         }
1614
1615         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1616
1617         /* disable recovery mode */
1618         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1619         if (ret != 0) {
1620                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1621                 return -1;
1622         }
1623
1624         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1625
1626         /*
1627           tell nodes to takeover their public IPs
1628          */
1629         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1630         if (ret != 0) {
1631                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1632                                  culprit));
1633                 rec->need_takeover_run = true;
1634                 return -1;
1635         }
1636         rec->need_takeover_run = false;
1637         ret = ctdb_takeover_run(ctdb, nodemap);
1638         if (ret != 0) {
1639                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1640                 rec->need_takeover_run = true;
1641         }
1642
1643         /* execute the "recovered" event script on all nodes */
1644         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1645         if (ret!=0) {
1646                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1647                 return -1;
1648         }
1649
1650         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1651
1652         /* send a message to all clients telling them that the cluster 
1653            has been reconfigured */
1654         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1655
1656         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1657
1658         rec->need_recovery = false;
1659
1660         /* we managed to complete a full recovery, make sure to forgive
1661            any past sins by the nodes that could now participate in the
1662            recovery.
1663         */
1664         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1665         for (i=0;i<nodemap->num;i++) {
1666                 struct ctdb_banning_state *ban_state;
1667
1668                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1669                         continue;
1670                 }
1671
1672                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1673                 if (ban_state == NULL) {
1674                         continue;
1675                 }
1676
1677                 ban_state->count = 0;
1678         }
1679
1680
1681         /* We just finished a recovery successfully. 
1682            We now wait for rerecovery_timeout before we allow 
1683            another recovery to take place.
1684         */
1685         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1686         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1687         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1688
1689         return 0;
1690 }
1691
1692
1693 /*
1694   elections are won by first checking the number of connected nodes, then
1695   the priority time, then the pnn
1696  */
1697 struct election_message {
1698         uint32_t num_connected;
1699         struct timeval priority_time;
1700         uint32_t pnn;
1701         uint32_t node_flags;
1702 };
1703
1704 /*
1705   form this nodes election data
1706  */
1707 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1708 {
1709         int ret, i;
1710         struct ctdb_node_map *nodemap;
1711         struct ctdb_context *ctdb = rec->ctdb;
1712
1713         ZERO_STRUCTP(em);
1714
1715         em->pnn = rec->ctdb->pnn;
1716         em->priority_time = rec->priority_time;
1717
1718         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1719         if (ret != 0) {
1720                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1721                 return;
1722         }
1723
1724         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1725         em->node_flags = rec->node_flags;
1726
1727         for (i=0;i<nodemap->num;i++) {
1728                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1729                         em->num_connected++;
1730                 }
1731         }
1732
1733         /* we shouldnt try to win this election if we cant be a recmaster */
1734         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1735                 em->num_connected = 0;
1736                 em->priority_time = timeval_current();
1737         }
1738
1739         talloc_free(nodemap);
1740 }
1741
1742 /*
1743   see if the given election data wins
1744  */
1745 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1746 {
1747         struct election_message myem;
1748         int cmp = 0;
1749
1750         ctdb_election_data(rec, &myem);
1751
1752         /* we cant win if we dont have the recmaster capability */
1753         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1754                 return false;
1755         }
1756
1757         /* we cant win if we are banned */
1758         if (rec->node_flags & NODE_FLAGS_BANNED) {
1759                 return false;
1760         }       
1761
1762         /* we cant win if we are stopped */
1763         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1764                 return false;
1765         }       
1766
1767         /* we will automatically win if the other node is banned */
1768         if (em->node_flags & NODE_FLAGS_BANNED) {
1769                 return true;
1770         }
1771
1772         /* we will automatically win if the other node is banned */
1773         if (em->node_flags & NODE_FLAGS_STOPPED) {
1774                 return true;
1775         }
1776
1777         /* try to use the most connected node */
1778         if (cmp == 0) {
1779                 cmp = (int)myem.num_connected - (int)em->num_connected;
1780         }
1781
1782         /* then the longest running node */
1783         if (cmp == 0) {
1784                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1785         }
1786
1787         if (cmp == 0) {
1788                 cmp = (int)myem.pnn - (int)em->pnn;
1789         }
1790
1791         return cmp > 0;
1792 }
1793
1794 /*
1795   send out an election request
1796  */
1797 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1798 {
1799         int ret;
1800         TDB_DATA election_data;
1801         struct election_message emsg;
1802         uint64_t srvid;
1803         struct ctdb_context *ctdb = rec->ctdb;
1804
1805         srvid = CTDB_SRVID_RECOVERY;
1806
1807         ctdb_election_data(rec, &emsg);
1808
1809         election_data.dsize = sizeof(struct election_message);
1810         election_data.dptr  = (unsigned char *)&emsg;
1811
1812
1813         /* send an election message to all active nodes */
1814         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1815         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1816
1817
1818         /* A new node that is already frozen has entered the cluster.
1819            The existing nodes are not frozen and dont need to be frozen
1820            until the election has ended and we start the actual recovery
1821         */
1822         if (update_recmaster == true) {
1823                 /* first we assume we will win the election and set 
1824                    recoverymaster to be ourself on the current node
1825                  */
1826                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1827                 if (ret != 0) {
1828                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1829                         return -1;
1830                 }
1831         }
1832
1833
1834         return 0;
1835 }
1836
1837 /*
1838   this function will unban all nodes in the cluster
1839 */
1840 static void unban_all_nodes(struct ctdb_context *ctdb)
1841 {
1842         int ret, i;
1843         struct ctdb_node_map *nodemap;
1844         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1845         
1846         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1847         if (ret != 0) {
1848                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1849                 return;
1850         }
1851
1852         for (i=0;i<nodemap->num;i++) {
1853                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1854                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1855                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1856                 }
1857         }
1858
1859         talloc_free(tmp_ctx);
1860 }
1861
1862
1863 /*
1864   we think we are winning the election - send a broadcast election request
1865  */
1866 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1867 {
1868         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1869         int ret;
1870
1871         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1872         if (ret != 0) {
1873                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1874         }
1875
1876         talloc_free(rec->send_election_te);
1877         rec->send_election_te = NULL;
1878 }
1879
1880 /*
1881   handler for memory dumps
1882 */
1883 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1884                              TDB_DATA data, void *private_data)
1885 {
1886         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1887         TDB_DATA *dump;
1888         int ret;
1889         struct rd_memdump_reply *rd;
1890
1891         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1893                 talloc_free(tmp_ctx);
1894                 return;
1895         }
1896         rd = (struct rd_memdump_reply *)data.dptr;
1897
1898         dump = talloc_zero(tmp_ctx, TDB_DATA);
1899         if (dump == NULL) {
1900                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1901                 talloc_free(tmp_ctx);
1902                 return;
1903         }
1904         ret = ctdb_dump_memory(ctdb, dump);
1905         if (ret != 0) {
1906                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1907                 talloc_free(tmp_ctx);
1908                 return;
1909         }
1910
1911 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1912
1913         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1914         if (ret != 0) {
1915                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1916                 talloc_free(tmp_ctx);
1917                 return;
1918         }
1919
1920         talloc_free(tmp_ctx);
1921 }
1922
1923 /*
1924   handler for reload_nodes
1925 */
1926 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1927                              TDB_DATA data, void *private_data)
1928 {
1929         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1930
1931         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1932
1933         reload_nodes_file(rec->ctdb);
1934 }
1935
1936
1937 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1938                               struct timeval yt, void *p)
1939 {
1940         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1941
1942         talloc_free(rec->ip_check_disable_ctx);
1943         rec->ip_check_disable_ctx = NULL;
1944 }
1945
1946
1947 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1948                              TDB_DATA data, void *private_data)
1949 {
1950         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1951         struct ctdb_public_ip *ip;
1952
1953         if (rec->recmaster != rec->ctdb->pnn) {
1954                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1955                 return;
1956         }
1957
1958         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1959                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1960                 return;
1961         }
1962
1963         ip = (struct ctdb_public_ip *)data.dptr;
1964
1965         update_ip_assignment_tree(rec->ctdb, ip);
1966 }
1967
1968
1969 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1970                              TDB_DATA data, void *private_data)
1971 {
1972         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1973         uint32_t timeout;
1974
1975         if (rec->ip_check_disable_ctx != NULL) {
1976                 talloc_free(rec->ip_check_disable_ctx);
1977                 rec->ip_check_disable_ctx = NULL;
1978         }
1979
1980         if (data.dsize != sizeof(uint32_t)) {
1981                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1982                                  "expexting %lu\n", (long unsigned)data.dsize,
1983                                  (long unsigned)sizeof(uint32_t)));
1984                 return;
1985         }
1986         if (data.dptr == NULL) {
1987                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1988                 return;
1989         }
1990
1991         timeout = *((uint32_t *)data.dptr);
1992         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1993
1994         rec->ip_check_disable_ctx = talloc_new(rec);
1995         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1996
1997         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1998 }
1999
2000
2001 /*
2002   handler for ip reallocate, just add it to the list of callers and 
2003   handle this later in the monitor_cluster loop so we do not recurse
2004   with other callers to takeover_run()
2005 */
2006 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2007                              TDB_DATA data, void *private_data)
2008 {
2009         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2010         struct ip_reallocate_list *caller;
2011
2012         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2013                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2014                 return;
2015         }
2016
2017         if (rec->ip_reallocate_ctx == NULL) {
2018                 rec->ip_reallocate_ctx = talloc_new(rec);
2019                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2020         }
2021
2022         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2023         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2024
2025         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2026         caller->next = rec->reallocate_callers;
2027         rec->reallocate_callers = caller;
2028
2029         return;
2030 }
2031
2032 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2033 {
2034         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2035         TDB_DATA result;
2036         int32_t ret;
2037         struct ip_reallocate_list *callers;
2038         uint32_t culprit;
2039
2040         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2041
2042         /* update the list of public ips that a node can handle for
2043            all connected nodes
2044         */
2045         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2048                                  culprit));
2049                 rec->need_takeover_run = true;
2050         }
2051         if (ret == 0) {
2052                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2053                 if (ret != 0) {
2054                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2055                         rec->need_takeover_run = true;
2056                 }
2057         }
2058
2059         result.dsize = sizeof(int32_t);
2060         result.dptr  = (uint8_t *)&ret;
2061
2062         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2063
2064                 /* Someone that sent srvid==0 does not want a reply */
2065                 if (callers->rd->srvid == 0) {
2066                         continue;
2067                 }
2068                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2069                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2070                                   (unsigned long long)callers->rd->srvid));
2071                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2072                 if (ret != 0) {
2073                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2074                                          "message to %u:%llu\n",
2075                                          (unsigned)callers->rd->pnn,
2076                                          (unsigned long long)callers->rd->srvid));
2077                 }
2078         }
2079
2080         talloc_free(tmp_ctx);
2081         talloc_free(rec->ip_reallocate_ctx);
2082         rec->ip_reallocate_ctx = NULL;
2083         rec->reallocate_callers = NULL;
2084         
2085 }
2086
2087
2088 /*
2089   handler for recovery master elections
2090 */
2091 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2092                              TDB_DATA data, void *private_data)
2093 {
2094         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2095         int ret;
2096         struct election_message *em = (struct election_message *)data.dptr;
2097         TALLOC_CTX *mem_ctx;
2098
2099         /* we got an election packet - update the timeout for the election */
2100         talloc_free(rec->election_timeout);
2101         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2102                                                 fast_start ?
2103                                                 timeval_current_ofs(0, 500000) :
2104                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2105                                                 ctdb_election_timeout, rec);
2106
2107         mem_ctx = talloc_new(ctdb);
2108
2109         /* someone called an election. check their election data
2110            and if we disagree and we would rather be the elected node, 
2111            send a new election message to all other nodes
2112          */
2113         if (ctdb_election_win(rec, em)) {
2114                 if (!rec->send_election_te) {
2115                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2116                                                                 timeval_current_ofs(0, 500000),
2117                                                                 election_send_request, rec);
2118                 }
2119                 talloc_free(mem_ctx);
2120                 /*unban_all_nodes(ctdb);*/
2121                 return;
2122         }
2123         
2124         /* we didn't win */
2125         talloc_free(rec->send_election_te);
2126         rec->send_election_te = NULL;
2127
2128         if (ctdb->tunable.verify_recovery_lock != 0) {
2129                 /* release the recmaster lock */
2130                 if (em->pnn != ctdb->pnn &&
2131                     ctdb->recovery_lock_fd != -1) {
2132                         close(ctdb->recovery_lock_fd);
2133                         ctdb->recovery_lock_fd = -1;
2134                         unban_all_nodes(ctdb);
2135                 }
2136         }
2137
2138         /* ok, let that guy become recmaster then */
2139         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2140         if (ret != 0) {
2141                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2142                 talloc_free(mem_ctx);
2143                 return;
2144         }
2145
2146         talloc_free(mem_ctx);
2147         return;
2148 }
2149
2150
2151 /*
2152   force the start of the election process
2153  */
2154 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2155                            struct ctdb_node_map *nodemap)
2156 {
2157         int ret;
2158         struct ctdb_context *ctdb = rec->ctdb;
2159
2160         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2161
2162         /* set all nodes to recovery mode to stop all internode traffic */
2163         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2164         if (ret != 0) {
2165                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2166                 return;
2167         }
2168
2169         talloc_free(rec->election_timeout);
2170         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2171                                                 fast_start ?
2172                                                 timeval_current_ofs(0, 500000) :
2173                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2174                                                 ctdb_election_timeout, rec);
2175
2176         ret = send_election_request(rec, pnn, true);
2177         if (ret!=0) {
2178                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2179                 return;
2180         }
2181
2182         /* wait for a few seconds to collect all responses */
2183         ctdb_wait_election(rec);
2184 }
2185
2186
2187
2188 /*
2189   handler for when a node changes its flags
2190 */
2191 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2192                             TDB_DATA data, void *private_data)
2193 {
2194         int ret;
2195         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2196         struct ctdb_node_map *nodemap=NULL;
2197         TALLOC_CTX *tmp_ctx;
2198         uint32_t changed_flags;
2199         int i;
2200         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2201         int disabled_flag_changed;
2202
2203         if (data.dsize != sizeof(*c)) {
2204                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2205                 return;
2206         }
2207
2208         tmp_ctx = talloc_new(ctdb);
2209         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2210
2211         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2212         if (ret != 0) {
2213                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2214                 talloc_free(tmp_ctx);
2215                 return;         
2216         }
2217
2218
2219         for (i=0;i<nodemap->num;i++) {
2220                 if (nodemap->nodes[i].pnn == c->pnn) break;
2221         }
2222
2223         if (i == nodemap->num) {
2224                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2225                 talloc_free(tmp_ctx);
2226                 return;
2227         }
2228
2229         changed_flags = c->old_flags ^ c->new_flags;
2230
2231         if (nodemap->nodes[i].flags != c->new_flags) {
2232                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2233         }
2234
2235         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2236
2237         nodemap->nodes[i].flags = c->new_flags;
2238
2239         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2240                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2241
2242         if (ret == 0) {
2243                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2244                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2245         }
2246         
2247         if (ret == 0 &&
2248             ctdb->recovery_master == ctdb->pnn &&
2249             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2250                 /* Only do the takeover run if the perm disabled or unhealthy
2251                    flags changed since these will cause an ip failover but not
2252                    a recovery.
2253                    If the node became disconnected or banned this will also
2254                    lead to an ip address failover but that is handled 
2255                    during recovery
2256                 */
2257                 if (disabled_flag_changed) {
2258                         rec->need_takeover_run = true;
2259                 }
2260         }
2261
2262         talloc_free(tmp_ctx);
2263 }
2264
2265 /*
2266   handler for when we need to push out flag changes ot all other nodes
2267 */
2268 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2269                             TDB_DATA data, void *private_data)
2270 {
2271         int ret;
2272         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2273         struct ctdb_node_map *nodemap=NULL;
2274         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2275         uint32_t recmaster;
2276         uint32_t *nodes;
2277
2278         /* find the recovery master */
2279         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2280         if (ret != 0) {
2281                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2282                 talloc_free(tmp_ctx);
2283                 return;
2284         }
2285
2286         /* read the node flags from the recmaster */
2287         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2288         if (ret != 0) {
2289                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2290                 talloc_free(tmp_ctx);
2291                 return;
2292         }
2293         if (c->pnn >= nodemap->num) {
2294                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2295                 talloc_free(tmp_ctx);
2296                 return;
2297         }
2298
2299         /* send the flags update to all connected nodes */
2300         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2301
2302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2303                                       nodes, 0, CONTROL_TIMEOUT(),
2304                                       false, data,
2305                                       NULL, NULL,
2306                                       NULL) != 0) {
2307                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2308
2309                 talloc_free(tmp_ctx);
2310                 return;
2311         }
2312
2313         talloc_free(tmp_ctx);
2314 }
2315
2316
2317 struct verify_recmode_normal_data {
2318         uint32_t count;
2319         enum monitor_result status;
2320 };
2321
2322 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2323 {
2324         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2325
2326
2327         /* one more node has responded with recmode data*/
2328         rmdata->count--;
2329
2330         /* if we failed to get the recmode, then return an error and let
2331            the main loop try again.
2332         */
2333         if (state->state != CTDB_CONTROL_DONE) {
2334                 if (rmdata->status == MONITOR_OK) {
2335                         rmdata->status = MONITOR_FAILED;
2336                 }
2337                 return;
2338         }
2339
2340         /* if we got a response, then the recmode will be stored in the
2341            status field
2342         */
2343         if (state->status != CTDB_RECOVERY_NORMAL) {
2344                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2345                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2346         }
2347
2348         return;
2349 }
2350
2351
2352 /* verify that all nodes are in normal recovery mode */
2353 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2354 {
2355         struct verify_recmode_normal_data *rmdata;
2356         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2357         struct ctdb_client_control_state *state;
2358         enum monitor_result status;
2359         int j;
2360         
2361         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2362         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2363         rmdata->count  = 0;
2364         rmdata->status = MONITOR_OK;
2365
2366         /* loop over all active nodes and send an async getrecmode call to 
2367            them*/
2368         for (j=0; j<nodemap->num; j++) {
2369                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2370                         continue;
2371                 }
2372                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2373                                         CONTROL_TIMEOUT(), 
2374                                         nodemap->nodes[j].pnn);
2375                 if (state == NULL) {
2376                         /* we failed to send the control, treat this as 
2377                            an error and try again next iteration
2378                         */                      
2379                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2380                         talloc_free(mem_ctx);
2381                         return MONITOR_FAILED;
2382                 }
2383
2384                 /* set up the callback functions */
2385                 state->async.fn = verify_recmode_normal_callback;
2386                 state->async.private_data = rmdata;
2387
2388                 /* one more control to wait for to complete */
2389                 rmdata->count++;
2390         }
2391
2392
2393         /* now wait for up to the maximum number of seconds allowed
2394            or until all nodes we expect a response from has replied
2395         */
2396         while (rmdata->count > 0) {
2397                 event_loop_once(ctdb->ev);
2398         }
2399
2400         status = rmdata->status;
2401         talloc_free(mem_ctx);
2402         return status;
2403 }
2404
2405
2406 struct verify_recmaster_data {
2407         struct ctdb_recoverd *rec;
2408         uint32_t count;
2409         uint32_t pnn;
2410         enum monitor_result status;
2411 };
2412
2413 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2414 {
2415         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2416
2417
2418         /* one more node has responded with recmaster data*/
2419         rmdata->count--;
2420
2421         /* if we failed to get the recmaster, then return an error and let
2422            the main loop try again.
2423         */
2424         if (state->state != CTDB_CONTROL_DONE) {
2425                 if (rmdata->status == MONITOR_OK) {
2426                         rmdata->status = MONITOR_FAILED;
2427                 }
2428                 return;
2429         }
2430
2431         /* if we got a response, then the recmaster will be stored in the
2432            status field
2433         */
2434         if (state->status != rmdata->pnn) {
2435                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2436                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2437                 rmdata->status = MONITOR_ELECTION_NEEDED;
2438         }
2439
2440         return;
2441 }
2442
2443
2444 /* verify that all nodes agree that we are the recmaster */
2445 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2446 {
2447         struct ctdb_context *ctdb = rec->ctdb;
2448         struct verify_recmaster_data *rmdata;
2449         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2450         struct ctdb_client_control_state *state;
2451         enum monitor_result status;
2452         int j;
2453         
2454         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2455         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2456         rmdata->rec    = rec;
2457         rmdata->count  = 0;
2458         rmdata->pnn    = pnn;
2459         rmdata->status = MONITOR_OK;
2460
2461         /* loop over all active nodes and send an async getrecmaster call to 
2462            them*/
2463         for (j=0; j<nodemap->num; j++) {
2464                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2465                         continue;
2466                 }
2467                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2468                                         CONTROL_TIMEOUT(),
2469                                         nodemap->nodes[j].pnn);
2470                 if (state == NULL) {
2471                         /* we failed to send the control, treat this as 
2472                            an error and try again next iteration
2473                         */                      
2474                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2475                         talloc_free(mem_ctx);
2476                         return MONITOR_FAILED;
2477                 }
2478
2479                 /* set up the callback functions */
2480                 state->async.fn = verify_recmaster_callback;
2481                 state->async.private_data = rmdata;
2482
2483                 /* one more control to wait for to complete */
2484                 rmdata->count++;
2485         }
2486
2487
2488         /* now wait for up to the maximum number of seconds allowed
2489            or until all nodes we expect a response from has replied
2490         */
2491         while (rmdata->count > 0) {
2492                 event_loop_once(ctdb->ev);
2493         }
2494
2495         status = rmdata->status;
2496         talloc_free(mem_ctx);
2497         return status;
2498 }
2499
2500
2501 /* called to check that the local allocation of public ip addresses is ok.
2502 */
2503 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2504 {
2505         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2506         struct ctdb_control_get_ifaces *ifaces = NULL;
2507         struct ctdb_all_public_ips *ips = NULL;
2508         struct ctdb_uptime *uptime1 = NULL;
2509         struct ctdb_uptime *uptime2 = NULL;
2510         int ret, j;
2511         bool need_iface_check = false;
2512         bool need_takeover_run = false;
2513
2514         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2515                                 CTDB_CURRENT_NODE, &uptime1);
2516         if (ret != 0) {
2517                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2518                 talloc_free(mem_ctx);
2519                 return -1;
2520         }
2521
2522
2523         /* read the interfaces from the local node */
2524         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2525         if (ret != 0) {
2526                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2527                 talloc_free(mem_ctx);
2528                 return -1;
2529         }
2530
2531         if (!rec->ifaces) {
2532                 need_iface_check = true;
2533         } else if (rec->ifaces->num != ifaces->num) {
2534                 need_iface_check = true;
2535         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2536                 need_iface_check = true;
2537         }
2538
2539         if (need_iface_check) {
2540                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2541                                      "local node %u - force takeover run\n",
2542                                      pnn));
2543                 need_takeover_run = true;
2544         }
2545
2546         /* read the ip allocation from the local node */
2547         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2548         if (ret != 0) {
2549                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2550                 talloc_free(mem_ctx);
2551                 return -1;
2552         }
2553
2554         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2555                                 CTDB_CURRENT_NODE, &uptime2);
2556         if (ret != 0) {
2557                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2558                 talloc_free(mem_ctx);
2559                 return -1;
2560         }
2561
2562         /* skip the check if the startrecovery time has changed */
2563         if (timeval_compare(&uptime1->last_recovery_started,
2564                             &uptime2->last_recovery_started) != 0) {
2565                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2566                 talloc_free(mem_ctx);
2567                 return 0;
2568         }
2569
2570         /* skip the check if the endrecovery time has changed */
2571         if (timeval_compare(&uptime1->last_recovery_finished,
2572                             &uptime2->last_recovery_finished) != 0) {
2573                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2574                 talloc_free(mem_ctx);
2575                 return 0;
2576         }
2577
2578         /* skip the check if we have started but not finished recovery */
2579         if (timeval_compare(&uptime1->last_recovery_finished,
2580                             &uptime1->last_recovery_started) != 1) {
2581                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2582                 talloc_free(mem_ctx);
2583
2584                 return 0;
2585         }
2586
2587         talloc_free(rec->ifaces);
2588         rec->ifaces = talloc_steal(rec, ifaces);
2589
2590         /* verify that we have the ip addresses we should have
2591            and we dont have ones we shouldnt have.
2592            if we find an inconsistency we set recmode to
2593            active on the local node and wait for the recmaster
2594            to do a full blown recovery.
2595            also if the pnn is -1 and we are healthy and can host the ip
2596            we also request a ip reallocation.
2597         */
2598         if (ctdb->tunable.disable_ip_failover == 0) {
2599                 for (j=0; j<ips->num; j++) {
2600                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2601                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2602                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2603                                 need_takeover_run = true;
2604                         } else if (ips->ips[j].pnn == pnn) {
2605                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2606                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2607                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2608                                         need_takeover_run = true;
2609                                 }
2610                         } else {
2611                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2612                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2613                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2614                                         need_takeover_run = true;
2615                                 }
2616                         }
2617                 }
2618         }
2619
2620         if (need_takeover_run) {
2621                 struct takeover_run_reply rd;
2622                 TDB_DATA data;
2623
2624                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2625
2626                 rd.pnn = ctdb->pnn;
2627                 rd.srvid = 0;
2628                 data.dptr = (uint8_t *)&rd;
2629                 data.dsize = sizeof(rd);
2630
2631                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2632                 if (ret != 0) {
2633                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2634                 }
2635         }
2636         talloc_free(mem_ctx);
2637         return 0;
2638 }
2639
2640
2641 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2642 {
2643         struct ctdb_node_map **remote_nodemaps = callback_data;
2644
2645         if (node_pnn >= ctdb->num_nodes) {
2646                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2647                 return;
2648         }
2649
2650         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2651
2652 }
2653
2654 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2655         struct ctdb_node_map *nodemap,
2656         struct ctdb_node_map **remote_nodemaps)
2657 {
2658         uint32_t *nodes;
2659
2660         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2661         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2662                                         nodes, 0,
2663                                         CONTROL_TIMEOUT(), false, tdb_null,
2664                                         async_getnodemap_callback,
2665                                         NULL,
2666                                         remote_nodemaps) != 0) {
2667                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2668
2669                 return -1;
2670         }
2671
2672         return 0;
2673 }
2674
2675 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2676 struct ctdb_check_reclock_state {
2677         struct ctdb_context *ctdb;
2678         struct timeval start_time;
2679         int fd[2];
2680         pid_t child;
2681         struct timed_event *te;
2682         struct fd_event *fde;
2683         enum reclock_child_status status;
2684 };
2685
2686 /* when we free the reclock state we must kill any child process.
2687 */
2688 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2689 {
2690         struct ctdb_context *ctdb = state->ctdb;
2691
2692         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2693
2694         if (state->fd[0] != -1) {
2695                 close(state->fd[0]);
2696                 state->fd[0] = -1;
2697         }
2698         if (state->fd[1] != -1) {
2699                 close(state->fd[1]);
2700                 state->fd[1] = -1;
2701         }
2702         kill(state->child, SIGKILL);
2703         return 0;
2704 }
2705
2706 /*
2707   called if our check_reclock child times out. this would happen if
2708   i/o to the reclock file blocks.
2709  */
2710 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2711                                          struct timeval t, void *private_data)
2712 {
2713         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2714                                            struct ctdb_check_reclock_state);
2715
2716         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2717         state->status = RECLOCK_TIMEOUT;
2718 }
2719
2720 /* this is called when the child process has completed checking the reclock
2721    file and has written data back to us through the pipe.
2722 */
2723 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2724                              uint16_t flags, void *private_data)
2725 {
2726         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2727                                              struct ctdb_check_reclock_state);
2728         char c = 0;
2729         int ret;
2730
2731         /* we got a response from our child process so we can abort the
2732            timeout.
2733         */
2734         talloc_free(state->te);
2735         state->te = NULL;
2736
2737         ret = read(state->fd[0], &c, 1);
2738         if (ret != 1 || c != RECLOCK_OK) {
2739                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2740                 state->status = RECLOCK_FAILED;
2741
2742                 return;
2743         }
2744
2745         state->status = RECLOCK_OK;
2746         return;
2747 }
2748
2749 static int check_recovery_lock(struct ctdb_context *ctdb)
2750 {
2751         int ret;
2752         struct ctdb_check_reclock_state *state;
2753         pid_t parent = getpid();
2754
2755         if (ctdb->recovery_lock_fd == -1) {
2756                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2757                 return -1;
2758         }
2759
2760         state = talloc(ctdb, struct ctdb_check_reclock_state);
2761         CTDB_NO_MEMORY(ctdb, state);
2762
2763         state->ctdb = ctdb;
2764         state->start_time = timeval_current();
2765         state->status = RECLOCK_CHECKING;
2766         state->fd[0] = -1;
2767         state->fd[1] = -1;
2768
2769         ret = pipe(state->fd);
2770         if (ret != 0) {
2771                 talloc_free(state);
2772                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2773                 return -1;
2774         }
2775
2776         state->child = ctdb_fork(ctdb);
2777         if (state->child == (pid_t)-1) {
2778                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2779                 close(state->fd[0]);
2780                 state->fd[0] = -1;
2781                 close(state->fd[1]);
2782                 state->fd[1] = -1;
2783                 talloc_free(state);
2784                 return -1;
2785         }
2786
2787         if (state->child == 0) {
2788                 char cc = RECLOCK_OK;
2789                 close(state->fd[0]);
2790                 state->fd[0] = -1;
2791
2792                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2793                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2794                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2795                         cc = RECLOCK_FAILED;
2796                 }
2797
2798                 write(state->fd[1], &cc, 1);
2799                 /* make sure we die when our parent dies */
2800                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2801                         sleep(5);
2802                         write(state->fd[1], &cc, 1);
2803                 }
2804                 _exit(0);
2805         }
2806         close(state->fd[1]);
2807         state->fd[1] = -1;
2808         set_close_on_exec(state->fd[0]);
2809
2810         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2811
2812         talloc_set_destructor(state, check_reclock_destructor);
2813
2814         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2815                                     ctdb_check_reclock_timeout, state);
2816         if (state->te == NULL) {
2817                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2818                 talloc_free(state);
2819                 return -1;
2820         }
2821
2822         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2823                                 EVENT_FD_READ,
2824                                 reclock_child_handler,
2825                                 (void *)state);
2826
2827         if (state->fde == NULL) {
2828                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2829                 talloc_free(state);
2830                 return -1;
2831         }
2832         tevent_fd_set_auto_close(state->fde);
2833
2834         while (state->status == RECLOCK_CHECKING) {
2835                 event_loop_once(ctdb->ev);
2836         }
2837
2838         if (state->status == RECLOCK_FAILED) {
2839                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2840                 close(ctdb->recovery_lock_fd);
2841                 ctdb->recovery_lock_fd = -1;
2842                 talloc_free(state);
2843                 return -1;
2844         }
2845
2846         talloc_free(state);
2847         return 0;
2848 }
2849
2850 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2851 {
2852         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2853         const char *reclockfile;
2854
2855         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2856                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2857                 talloc_free(tmp_ctx);
2858                 return -1;      
2859         }
2860
2861         if (reclockfile == NULL) {
2862                 if (ctdb->recovery_lock_file != NULL) {
2863                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2864                         talloc_free(ctdb->recovery_lock_file);
2865                         ctdb->recovery_lock_file = NULL;
2866                         if (ctdb->recovery_lock_fd != -1) {
2867                                 close(ctdb->recovery_lock_fd);
2868                                 ctdb->recovery_lock_fd = -1;
2869                         }
2870                 }
2871                 ctdb->tunable.verify_recovery_lock = 0;
2872                 talloc_free(tmp_ctx);
2873                 return 0;
2874         }
2875
2876         if (ctdb->recovery_lock_file == NULL) {
2877                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2878                 if (ctdb->recovery_lock_fd != -1) {
2879                         close(ctdb->recovery_lock_fd);
2880                         ctdb->recovery_lock_fd = -1;
2881                 }
2882                 talloc_free(tmp_ctx);
2883                 return 0;
2884         }
2885
2886
2887         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2888                 talloc_free(tmp_ctx);
2889                 return 0;
2890         }
2891
2892         talloc_free(ctdb->recovery_lock_file);
2893         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2894         ctdb->tunable.verify_recovery_lock = 0;
2895         if (ctdb->recovery_lock_fd != -1) {
2896                 close(ctdb->recovery_lock_fd);
2897                 ctdb->recovery_lock_fd = -1;
2898         }
2899
2900         talloc_free(tmp_ctx);
2901         return 0;
2902 }
2903
2904 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2905                       TALLOC_CTX *mem_ctx)
2906 {
2907         uint32_t pnn;
2908         struct ctdb_node_map *nodemap=NULL;
2909         struct ctdb_node_map *recmaster_nodemap=NULL;
2910         struct ctdb_node_map **remote_nodemaps=NULL;
2911         struct ctdb_vnn_map *vnnmap=NULL;
2912         struct ctdb_vnn_map *remote_vnnmap=NULL;
2913         int32_t debug_level;
2914         int i, j, ret;
2915
2916
2917
2918         /* verify that the main daemon is still running */
2919         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2920                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2921                 exit(-1);
2922         }
2923
2924         /* ping the local daemon to tell it we are alive */
2925         ctdb_ctrl_recd_ping(ctdb);
2926
2927         if (rec->election_timeout) {
2928                 /* an election is in progress */
2929                 return;
2930         }
2931
2932         /* read the debug level from the parent and update locally */
2933         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2934         if (ret !=0) {
2935                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2936                 return;
2937         }
2938         LogLevel = debug_level;
2939
2940
2941         /* We must check if we need to ban a node here but we want to do this
2942            as early as possible so we dont wait until we have pulled the node
2943            map from the local node. thats why we have the hardcoded value 20
2944         */
2945         for (i=0; i<ctdb->num_nodes; i++) {
2946                 struct ctdb_banning_state *ban_state;
2947
2948                 if (ctdb->nodes[i]->ban_state == NULL) {
2949                         continue;
2950                 }
2951                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2952                 if (ban_state->count < 20) {
2953                         continue;
2954                 }
2955                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2956                         ctdb->nodes[i]->pnn, ban_state->count,
2957                         ctdb->tunable.recovery_ban_period));
2958                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2959                 ban_state->count = 0;
2960         }
2961
2962         /* get relevant tunables */
2963         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2964         if (ret != 0) {
2965                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2966                 return;
2967         }
2968
2969         /* get the current recovery lock file from the server */
2970         if (update_recovery_lock_file(ctdb) != 0) {
2971                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2972                 return;
2973         }
2974
2975         /* Make sure that if recovery lock verification becomes disabled when
2976            we close the file
2977         */
2978         if (ctdb->tunable.verify_recovery_lock == 0) {
2979                 if (ctdb->recovery_lock_fd != -1) {
2980                         close(ctdb->recovery_lock_fd);
2981                         ctdb->recovery_lock_fd = -1;
2982                 }
2983         }
2984
2985         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2986         if (pnn == (uint32_t)-1) {
2987                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2988                 return;
2989         }
2990
2991         /* get the vnnmap */
2992         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2993         if (ret != 0) {
2994                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2995                 return;
2996         }
2997
2998
2999         /* get number of nodes */
3000         if (rec->nodemap) {
3001                 talloc_free(rec->nodemap);
3002                 rec->nodemap = NULL;
3003                 nodemap=NULL;
3004         }
3005         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3006         if (ret != 0) {
3007                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3008                 return;
3009         }
3010         nodemap = rec->nodemap;
3011
3012         /* check which node is the recovery master */
3013         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3014         if (ret != 0) {
3015                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3016                 return;
3017         }
3018
3019         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3020         if (rec->recmaster != pnn) {
3021                 if (rec->ip_reallocate_ctx != NULL) {
3022                         talloc_free(rec->ip_reallocate_ctx);
3023                         rec->ip_reallocate_ctx = NULL;
3024                         rec->reallocate_callers = NULL;
3025                 }
3026         }
3027
3028         if (rec->recmaster == (uint32_t)-1) {
3029                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3030                 force_election(rec, pnn, nodemap);
3031                 return;
3032         }
3033
3034
3035         /* if the local daemon is STOPPED, we verify that the databases are
3036            also frozen and thet the recmode is set to active 
3037         */
3038         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3039                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3040                 if (ret != 0) {
3041                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3042                 }
3043                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3044                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3045
3046                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3047                         if (ret != 0) {
3048                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3049                                 return;
3050                         }
3051                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3052                         if (ret != 0) {
3053                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3054
3055                                 return;
3056                         }
3057                         return;
3058                 }
3059         }
3060         /* If the local node is stopped, verify we are not the recmaster 
3061            and yield this role if so
3062         */
3063         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3064                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3065                 force_election(rec, pnn, nodemap);
3066                 return;
3067         }
3068         
3069         /* check that we (recovery daemon) and the local ctdb daemon
3070            agrees on whether we are banned or not
3071         */
3072 //qqq
3073
3074         /* remember our own node flags */
3075         rec->node_flags = nodemap->nodes[pnn].flags;
3076
3077         /* count how many active nodes there are */
3078         rec->num_active    = 0;
3079         rec->num_connected = 0;
3080         for (i=0; i<nodemap->num; i++) {
3081                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3082                         rec->num_active++;
3083                 }
3084                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3085                         rec->num_connected++;
3086                 }
3087         }
3088
3089
3090         /* verify that the recmaster node is still active */
3091         for (j=0; j<nodemap->num; j++) {
3092                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3093                         break;
3094                 }
3095         }
3096
3097         if (j == nodemap->num) {
3098                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3099                 force_election(rec, pnn, nodemap);
3100                 return;
3101         }
3102
3103         /* if recovery master is disconnected we must elect a new recmaster */
3104         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3105                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3106                 force_election(rec, pnn, nodemap);
3107                 return;
3108         }
3109
3110         /* grap the nodemap from the recovery master to check if it is banned */
3111         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3112                                    mem_ctx, &recmaster_nodemap);
3113         if (ret != 0) {
3114                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3115                           nodemap->nodes[j].pnn));
3116                 return;
3117         }
3118
3119
3120         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3121                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3122                 force_election(rec, pnn, nodemap);
3123                 return;
3124         }
3125
3126
3127         /* verify that we have all ip addresses we should have and we dont
3128          * have addresses we shouldnt have.
3129          */ 
3130         if (ctdb->tunable.disable_ip_failover == 0) {
3131                 if (rec->ip_check_disable_ctx == NULL) {
3132                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3133                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3134                         }
3135                 }
3136         }
3137
3138
3139         /* if we are not the recmaster then we do not need to check
3140            if recovery is needed
3141          */
3142         if (pnn != rec->recmaster) {
3143                 return;
3144         }
3145
3146
3147         /* ensure our local copies of flags are right */
3148         ret = update_local_flags(rec, nodemap);
3149         if (ret == MONITOR_ELECTION_NEEDED) {
3150                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3151                 force_election(rec, pnn, nodemap);
3152                 return;
3153         }
3154         if (ret != MONITOR_OK) {
3155                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3156                 return;
3157         }
3158
3159         if (ctdb->num_nodes != nodemap->num) {
3160                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3161                 reload_nodes_file(ctdb);
3162                 return;
3163         }
3164
3165         /* verify that all active nodes agree that we are the recmaster */
3166         switch (verify_recmaster(rec, nodemap, pnn)) {
3167         case MONITOR_RECOVERY_NEEDED:
3168                 /* can not happen */
3169                 return;
3170         case MONITOR_ELECTION_NEEDED:
3171                 force_election(rec, pnn, nodemap);
3172                 return;
3173         case MONITOR_OK:
3174                 break;
3175         case MONITOR_FAILED:
3176                 return;
3177         }
3178
3179
3180         if (rec->need_recovery) {
3181                 /* a previous recovery didn't finish */
3182                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3183                 return;
3184         }
3185
3186         /* verify that all active nodes are in normal mode 
3187            and not in recovery mode 
3188         */
3189         switch (verify_recmode(ctdb, nodemap)) {
3190         case MONITOR_RECOVERY_NEEDED:
3191                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3192                 return;
3193         case MONITOR_FAILED:
3194                 return;
3195         case MONITOR_ELECTION_NEEDED:
3196                 /* can not happen */
3197         case MONITOR_OK:
3198                 break;
3199         }
3200
3201
3202         if (ctdb->tunable.verify_recovery_lock != 0) {
3203                 /* we should have the reclock - check its not stale */
3204                 ret = check_recovery_lock(ctdb);
3205                 if (ret != 0) {
3206                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3207                         ctdb_set_culprit(rec, ctdb->pnn);
3208                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3209                         return;
3210                 }
3211         }
3212
3213         /* if there are takeovers requested, perform it and notify the waiters */
3214         if (rec->reallocate_callers) {
3215                 process_ipreallocate_requests(ctdb, rec);
3216         }
3217
3218         /* get the nodemap for all active remote nodes
3219          */
3220         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3221         if (remote_nodemaps == NULL) {
3222                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3223                 return;
3224         }
3225         for(i=0; i<nodemap->num; i++) {
3226                 remote_nodemaps[i] = NULL;
3227         }
3228         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3229                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3230                 return;
3231         } 
3232
3233         /* verify that all other nodes have the same nodemap as we have
3234         */
3235         for (j=0; j<nodemap->num; j++) {
3236                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3237                         continue;
3238                 }
3239
3240                 if (remote_nodemaps[j] == NULL) {
3241                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3242                         ctdb_set_culprit(rec, j);
3243
3244                         return;
3245                 }
3246
3247                 /* if the nodes disagree on how many nodes there are
3248                    then this is a good reason to try recovery
3249                  */
3250                 if (remote_nodemaps[j]->num != nodemap->num) {
3251                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3252                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3253                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3254                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3255                         return;
3256                 }
3257
3258                 /* if the nodes disagree on which nodes exist and are
3259                    active, then that is also a good reason to do recovery
3260                  */
3261                 for (i=0;i<nodemap->num;i++) {
3262                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3263                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3264                                           nodemap->nodes[j].pnn, i, 
3265                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3266                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3267                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3268                                             vnnmap);
3269                                 return;
3270                         }
3271                 }
3272
3273                 /* verify the flags are consistent
3274                 */
3275                 for (i=0; i<nodemap->num; i++) {
3276                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3277                                 continue;
3278                         }
3279                         
3280                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3281                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3282                                   nodemap->nodes[j].pnn, 
3283                                   nodemap->nodes[i].pnn, 
3284                                   remote_nodemaps[j]->nodes[i].flags,
3285                                   nodemap->nodes[j].flags));
3286                                 if (i == j) {
3287                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3288                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3289                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3290                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3291                                                     vnnmap);
3292                                         return;
3293                                 } else {
3294                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3295                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3296                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3297                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3298                                                     vnnmap);
3299                                         return;
3300                                 }
3301                         }
3302                 }
3303         }
3304
3305
3306         /* there better be the same number of lmasters in the vnn map
3307            as there are active nodes or we will have to do a recovery
3308          */
3309         if (vnnmap->size != rec->num_active) {
3310                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3311                           vnnmap->size, rec->num_active));
3312                 ctdb_set_culprit(rec, ctdb->pnn);
3313                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3314                 return;
3315         }
3316
3317         /* verify that all active nodes in the nodemap also exist in 
3318            the vnnmap.
3319          */
3320         for (j=0; j<nodemap->num; j++) {
3321                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3322                         continue;
3323                 }
3324                 if (nodemap->nodes[j].pnn == pnn) {
3325                         continue;
3326                 }
3327
3328                 for (i=0; i<vnnmap->size; i++) {
3329                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3330                                 break;
3331                         }
3332                 }
3333                 if (i == vnnmap->size) {
3334                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3335                                   nodemap->nodes[j].pnn));
3336                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3337                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3338                         return;
3339                 }
3340         }
3341
3342         
3343         /* verify that all other nodes have the same vnnmap
3344            and are from the same generation
3345          */
3346         for (j=0; j<nodemap->num; j++) {
3347                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3348                         continue;
3349                 }
3350                 if (nodemap->nodes[j].pnn == pnn) {
3351                         continue;
3352                 }
3353
3354                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3355                                           mem_ctx, &remote_vnnmap);
3356                 if (ret != 0) {
3357                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3358                                   nodemap->nodes[j].pnn));
3359                         return;
3360                 }
3361
3362                 /* verify the vnnmap generation is the same */
3363                 if (vnnmap->generation != remote_vnnmap->generation) {
3364                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3365                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3366                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3367                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3368                         return;
3369                 }
3370
3371                 /* verify the vnnmap size is the same */
3372                 if (vnnmap->size != remote_vnnmap->size) {
3373                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3374                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3375                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3376                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3377                         return;
3378                 }
3379
3380                 /* verify the vnnmap is the same */
3381                 for (i=0;i<vnnmap->size;i++) {
3382                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3383                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3384                                           nodemap->nodes[j].pnn));
3385                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3386                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3387                                             vnnmap);
3388                                 return;
3389                         }
3390                 }
3391         }
3392
3393         /* we might need to change who has what IP assigned */
3394         if (rec->need_takeover_run) {
3395                 uint32_t culprit = (uint32_t)-1;
3396
3397                 rec->need_takeover_run = false;
3398
3399                 /* update the list of public ips that a node can handle for
3400                    all connected nodes
3401                 */
3402                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3403                 if (ret != 0) {
3404                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3405                                          culprit));
3406                         rec->need_takeover_run = true;
3407                         return;
3408                 }
3409
3410                 /* execute the "startrecovery" event script on all nodes */
3411                 ret = run_startrecovery_eventscript(rec, nodemap);
3412                 if (ret!=0) {
3413                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3414                         ctdb_set_culprit(rec, ctdb->pnn);
3415                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3416                         return;
3417                 }
3418
3419                 ret = ctdb_takeover_run(ctdb, nodemap);
3420                 if (ret != 0) {
3421                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3422                         return;
3423                 }
3424
3425                 /* execute the "recovered" event script on all nodes */
3426                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3427 #if 0
3428 // we cant check whether the event completed successfully
3429 // since this script WILL fail if the node is in recovery mode
3430 // and if that race happens, the code here would just cause a second
3431 // cascading recovery.
3432                 if (ret!=0) {
3433                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3434                         ctdb_set_culprit(rec, ctdb->pnn);
3435                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3436                 }
3437 #endif
3438         }
3439 }
3440
3441 /*
3442   the main monitoring loop
3443  */
3444 static void monitor_cluster(struct ctdb_context *ctdb)
3445 {
3446         struct ctdb_recoverd *rec;
3447
3448         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3449
3450         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3451         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3452
3453         rec->ctdb = ctdb;
3454
3455         rec->priority_time = timeval_current();
3456
3457         /* register a message port for sending memory dumps */
3458         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3459
3460         /* register a message port for recovery elections */
3461         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3462
3463         /* when nodes are disabled/enabled */
3464         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3465
3466         /* when we are asked to puch out a flag change */
3467         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3468
3469         /* register a message port for vacuum fetch */
3470         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3471
3472         /* register a message port for reloadnodes  */
3473         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3474
3475         /* register a message port for performing a takeover run */
3476         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3477
3478         /* register a message port for disabling the ip check for a short while */
3479         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3480
3481         /* register a message port for updating the recovery daemons node assignment for an ip */
3482         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3483
3484         for (;;) {
3485                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3486                 struct timeval start;
3487                 double elapsed;
3488
3489                 if (!mem_ctx) {
3490                         DEBUG(DEBUG_CRIT,(__location__
3491                                           " Failed to create temp context\n"));
3492                         exit(-1);
3493                 }
3494
3495                 start = timeval_current();
3496                 main_loop(ctdb, rec, mem_ctx);
3497                 talloc_free(mem_ctx);
3498
3499                 /* we only check for recovery once every second */
3500                 elapsed = timeval_elapsed(&start);
3501                 if (elapsed < ctdb->tunable.recover_interval) {
3502                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3503                                           - elapsed);
3504                 }
3505         }
3506 }
3507
3508 /*
3509   event handler for when the main ctdbd dies
3510  */
3511 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3512                                  uint16_t flags, void *private_data)
3513 {
3514         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3515         _exit(1);
3516 }
3517
3518 /*
3519   called regularly to verify that the recovery daemon is still running
3520  */
3521 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3522                               struct timeval yt, void *p)
3523 {
3524         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3525
3526         if (kill(ctdb->recoverd_pid, 0) != 0) {
3527                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3528
3529                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3530                                 ctdb_restart_recd, ctdb);
3531
3532                 return;
3533         }
3534
3535         event_add_timed(ctdb->ev, ctdb, 
3536                         timeval_current_ofs(30, 0),
3537                         ctdb_check_recd, ctdb);
3538 }
3539
3540 static void recd_sig_child_handler(struct event_context *ev,
3541         struct signal_event *se, int signum, int count,
3542         void *dont_care, 
3543         void *private_data)
3544 {
3545 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3546         int status;
3547         pid_t pid = -1;
3548
3549         while (pid != 0) {
3550                 pid = waitpid(-1, &status, WNOHANG);
3551                 if (pid == -1) {
3552                         if (errno != ECHILD) {
3553                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3554                         }
3555                         return;
3556                 }
3557                 if (pid > 0) {
3558                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3559                 }
3560         }
3561 }
3562
3563 /*
3564   startup the recovery daemon as a child of the main ctdb daemon
3565  */
3566 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3567 {
3568         int fd[2];
3569         struct signal_event *se;
3570         struct tevent_fd *fde;
3571
3572         if (pipe(fd) != 0) {
3573                 return -1;
3574         }
3575
3576         ctdb->ctdbd_pid = getpid();
3577
3578         ctdb->recoverd_pid = fork();
3579         if (ctdb->recoverd_pid == -1) {
3580                 return -1;
3581         }
3582         
3583         if (ctdb->recoverd_pid != 0) {
3584                 close(fd[0]);
3585                 event_add_timed(ctdb->ev, ctdb, 
3586                                 timeval_current_ofs(30, 0),
3587                                 ctdb_check_recd, ctdb);
3588                 return 0;
3589         }
3590
3591         close(fd[1]);
3592
3593         srandom(getpid() ^ time(NULL));
3594
3595         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3596                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3597                 exit(1);
3598         }
3599
3600         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3601
3602         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3603                      ctdb_recoverd_parent, &fd[0]);     
3604         tevent_fd_set_auto_close(fde);
3605
3606         /* set up a handler to pick up sigchld */
3607         se = event_add_signal(ctdb->ev, ctdb,
3608                                      SIGCHLD, 0,
3609                                      recd_sig_child_handler,
3610                                      ctdb);
3611         if (se == NULL) {
3612                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3613                 exit(1);
3614         }
3615
3616         monitor_cluster(ctdb);
3617
3618         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3619         return -1;
3620 }
3621
3622 /*
3623   shutdown the recovery daemon
3624  */
3625 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3626 {
3627         if (ctdb->recoverd_pid == 0) {
3628                 return;
3629         }
3630
3631         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3632         kill(ctdb->recoverd_pid, SIGTERM);
3633 }
3634
3635 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3636                        struct timeval t, void *private_data)
3637 {
3638         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3639
3640         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3641         ctdb_stop_recoverd(ctdb);
3642         ctdb_start_recoverd(ctdb);
3643 }