e1f580a4fb2bc9114e3a078e67ce0c005977b476
[kai/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38 #include "ctdb_logging.h"
39
40 #include "common/system.h"
41 #include "common/cmdline.h"
42 #include "common/common.h"
43
44
45 /* List of SRVID requests that need to be processed */
46 struct srvid_list {
47         struct srvid_list *next, *prev;
48         struct srvid_request *request;
49 };
50
51 struct srvid_requests {
52         struct srvid_list *requests;
53 };
54
55 static void srvid_request_reply(struct ctdb_context *ctdb,
56                                 struct srvid_request *request,
57                                 TDB_DATA result)
58 {
59         /* Someone that sent srvid==0 does not want a reply */
60         if (request->srvid == 0) {
61                 talloc_free(request);
62                 return;
63         }
64
65         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
66                                      result) == 0) {
67                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
68                                   (unsigned)request->pnn,
69                                   (unsigned long long)request->srvid));
70         } else {
71                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
72                                  (unsigned)request->pnn,
73                                  (unsigned long long)request->srvid));
74         }
75
76         talloc_free(request);
77 }
78
79 static void srvid_requests_reply(struct ctdb_context *ctdb,
80                                  struct srvid_requests **requests,
81                                  TDB_DATA result)
82 {
83         struct srvid_list *r;
84
85         for (r = (*requests)->requests; r != NULL; r = r->next) {
86                 srvid_request_reply(ctdb, r->request, result);
87         }
88
89         /* Free the list structure... */
90         TALLOC_FREE(*requests);
91 }
92
93 static void srvid_request_add(struct ctdb_context *ctdb,
94                               struct srvid_requests **requests,
95                               struct srvid_request *request)
96 {
97         struct srvid_list *t;
98         int32_t ret;
99         TDB_DATA result;
100
101         if (*requests == NULL) {
102                 *requests = talloc_zero(ctdb, struct srvid_requests);
103                 if (*requests == NULL) {
104                         goto nomem;
105                 }
106         }
107
108         t = talloc_zero(*requests, struct srvid_list);
109         if (t == NULL) {
110                 /* If *requests was just allocated above then free it */
111                 if ((*requests)->requests == NULL) {
112                         TALLOC_FREE(*requests);
113                 }
114                 goto nomem;
115         }
116
117         t->request = (struct srvid_request *)talloc_steal(t, request);
118         DLIST_ADD((*requests)->requests, t);
119
120         return;
121
122 nomem:
123         /* Failed to add the request to the list.  Send a fail. */
124         DEBUG(DEBUG_ERR, (__location__
125                           " Out of memory, failed to queue SRVID request\n"));
126         ret = -ENOMEM;
127         result.dsize = sizeof(ret);
128         result.dptr = (uint8_t *)&ret;
129         srvid_request_reply(ctdb, request, result);
130 }
131
132 /* An abstraction to allow an operation (takeover runs, recoveries,
133  * ...) to be disabled for a given timeout */
134 struct ctdb_op_state {
135         struct tevent_timer *timer;
136         bool in_progress;
137         const char *name;
138 };
139
140 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
141 {
142         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
143
144         if (state != NULL) {
145                 state->in_progress = false;
146                 state->name = name;
147         }
148
149         return state;
150 }
151
152 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
153 {
154         return state->timer != NULL;
155 }
156
157 static bool ctdb_op_begin(struct ctdb_op_state *state)
158 {
159         if (ctdb_op_is_disabled(state)) {
160                 DEBUG(DEBUG_NOTICE,
161                       ("Unable to begin - %s are disabled\n", state->name));
162                 return false;
163         }
164
165         state->in_progress = true;
166         return true;
167 }
168
169 static bool ctdb_op_end(struct ctdb_op_state *state)
170 {
171         return state->in_progress = false;
172 }
173
174 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
175 {
176         return state->in_progress;
177 }
178
179 static void ctdb_op_enable(struct ctdb_op_state *state)
180 {
181         TALLOC_FREE(state->timer);
182 }
183
184 static void ctdb_op_timeout_handler(struct tevent_context *ev,
185                                     struct tevent_timer *te,
186                                     struct timeval yt, void *p)
187 {
188         struct ctdb_op_state *state =
189                 talloc_get_type(p, struct ctdb_op_state);
190
191         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
192         ctdb_op_enable(state);
193 }
194
195 static int ctdb_op_disable(struct ctdb_op_state *state,
196                            struct tevent_context *ev,
197                            uint32_t timeout)
198 {
199         if (timeout == 0) {
200                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
201                 ctdb_op_enable(state);
202                 return 0;
203         }
204
205         if (state->in_progress) {
206                 DEBUG(DEBUG_ERR,
207                       ("Unable to disable %s - in progress\n", state->name));
208                 return -EAGAIN;
209         }
210
211         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
212                             state->name, timeout));
213
214         /* Clear any old timers */
215         talloc_free(state->timer);
216
217         /* Arrange for the timeout to occur */
218         state->timer = tevent_add_timer(ev, state,
219                                         timeval_current_ofs(timeout, 0),
220                                         ctdb_op_timeout_handler, state);
221         if (state->timer == NULL) {
222                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
223                 return -ENOMEM;
224         }
225
226         return 0;
227 }
228
229 struct ctdb_banning_state {
230         uint32_t count;
231         struct timeval last_reported_time;
232 };
233
234 /*
235   private state of recovery daemon
236  */
237 struct ctdb_recoverd {
238         struct ctdb_context *ctdb;
239         uint32_t recmaster;
240         uint32_t last_culprit_node;
241         struct ctdb_node_map_old *nodemap;
242         struct timeval priority_time;
243         bool need_takeover_run;
244         bool need_recovery;
245         uint32_t node_flags;
246         struct tevent_timer *send_election_te;
247         struct tevent_timer *election_timeout;
248         struct srvid_requests *reallocate_requests;
249         struct ctdb_op_state *takeover_run;
250         struct ctdb_op_state *recovery;
251         struct ctdb_control_get_ifaces *ifaces;
252         uint32_t *force_rebalance_nodes;
253         struct ctdb_node_capabilities *caps;
254 };
255
256 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
257 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
258
259 static void ctdb_restart_recd(struct tevent_context *ev,
260                               struct tevent_timer *te, struct timeval t,
261                               void *private_data);
262
263 /*
264   ban a node for a period of time
265  */
266 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
267 {
268         int ret;
269         struct ctdb_context *ctdb = rec->ctdb;
270         struct ctdb_ban_time bantime;
271        
272         if (!ctdb_validate_pnn(ctdb, pnn)) {
273                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
274                 return;
275         }
276
277         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
278
279         bantime.pnn  = pnn;
280         bantime.time = ban_time;
281
282         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
283         if (ret != 0) {
284                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
285                 return;
286         }
287
288 }
289
290 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
291
292
293 /*
294   remember the trouble maker
295  */
296 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
297 {
298         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
299         struct ctdb_banning_state *ban_state;
300
301         if (culprit > ctdb->num_nodes) {
302                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
303                 return;
304         }
305
306         /* If we are banned or stopped, do not set other nodes as culprits */
307         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
308                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
309                 return;
310         }
311
312         if (ctdb->nodes[culprit]->ban_state == NULL) {
313                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
314                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
315
316                 
317         }
318         ban_state = ctdb->nodes[culprit]->ban_state;
319         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
320                 /* this was the first time in a long while this node
321                    misbehaved so we will forgive any old transgressions.
322                 */
323                 ban_state->count = 0;
324         }
325
326         ban_state->count += count;
327         ban_state->last_reported_time = timeval_current();
328         rec->last_culprit_node = culprit;
329 }
330
331 /*
332   remember the trouble maker
333  */
334 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
335 {
336         ctdb_set_culprit_count(rec, culprit, 1);
337 }
338
339
340 /* this callback is called for every node that failed to execute the
341    recovered event
342 */
343 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
344 {
345         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
346
347         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
348
349         ctdb_set_culprit(rec, node_pnn);
350 }
351
352 /*
353   run the "recovered" eventscript on all nodes
354  */
355 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
356 {
357         TALLOC_CTX *tmp_ctx;
358         uint32_t *nodes;
359         struct ctdb_context *ctdb = rec->ctdb;
360
361         tmp_ctx = talloc_new(ctdb);
362         CTDB_NO_MEMORY(ctdb, tmp_ctx);
363
364         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
365         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
366                                         nodes, 0,
367                                         CONTROL_TIMEOUT(), false, tdb_null,
368                                         NULL, recovered_fail_callback,
369                                         rec) != 0) {
370                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
371
372                 talloc_free(tmp_ctx);
373                 return -1;
374         }
375
376         talloc_free(tmp_ctx);
377         return 0;
378 }
379
380 /* this callback is called for every node that failed to execute the
381    start recovery event
382 */
383 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
384 {
385         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
386
387         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
388
389         ctdb_set_culprit(rec, node_pnn);
390 }
391
392 /*
393   run the "startrecovery" eventscript on all nodes
394  */
395 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
396 {
397         TALLOC_CTX *tmp_ctx;
398         uint32_t *nodes;
399         struct ctdb_context *ctdb = rec->ctdb;
400
401         tmp_ctx = talloc_new(ctdb);
402         CTDB_NO_MEMORY(ctdb, tmp_ctx);
403
404         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
406                                         nodes, 0,
407                                         CONTROL_TIMEOUT(), false, tdb_null,
408                                         NULL,
409                                         startrecovery_fail_callback,
410                                         rec) != 0) {
411                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
412                 talloc_free(tmp_ctx);
413                 return -1;
414         }
415
416         talloc_free(tmp_ctx);
417         return 0;
418 }
419
420 /*
421   update the node capabilities for all connected nodes
422  */
423 static int update_capabilities(struct ctdb_recoverd *rec,
424                                struct ctdb_node_map_old *nodemap)
425 {
426         uint32_t *capp;
427         TALLOC_CTX *tmp_ctx;
428         struct ctdb_node_capabilities *caps;
429         struct ctdb_context *ctdb = rec->ctdb;
430
431         tmp_ctx = talloc_new(rec);
432         CTDB_NO_MEMORY(ctdb, tmp_ctx);
433
434         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
435                                      CONTROL_TIMEOUT(), nodemap);
436
437         if (caps == NULL) {
438                 DEBUG(DEBUG_ERR,
439                       (__location__ " Failed to get node capabilities\n"));
440                 talloc_free(tmp_ctx);
441                 return -1;
442         }
443
444         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
445         if (capp == NULL) {
446                 DEBUG(DEBUG_ERR,
447                       (__location__
448                        " Capabilities don't include current node.\n"));
449                 talloc_free(tmp_ctx);
450                 return -1;
451         }
452         ctdb->capabilities = *capp;
453
454         TALLOC_FREE(rec->caps);
455         rec->caps = talloc_steal(rec, caps);
456
457         talloc_free(tmp_ctx);
458         return 0;
459 }
460
461 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
462 {
463         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
464
465         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
466         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
467 }
468
469 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
470 {
471         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
472
473         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
474         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
475 }
476
477 /*
478   change recovery mode on all nodes
479  */
480 static int set_recovery_mode(struct ctdb_context *ctdb,
481                              struct ctdb_recoverd *rec,
482                              struct ctdb_node_map_old *nodemap,
483                              uint32_t rec_mode, bool freeze)
484 {
485         TDB_DATA data;
486         uint32_t *nodes;
487         TALLOC_CTX *tmp_ctx;
488
489         tmp_ctx = talloc_new(ctdb);
490         CTDB_NO_MEMORY(ctdb, tmp_ctx);
491
492         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
493
494         data.dsize = sizeof(uint32_t);
495         data.dptr = (unsigned char *)&rec_mode;
496
497         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
498                                         nodes, 0,
499                                         CONTROL_TIMEOUT(),
500                                         false, data,
501                                         NULL, NULL,
502                                         NULL) != 0) {
503                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
504                 talloc_free(tmp_ctx);
505                 return -1;
506         }
507
508         /* freeze all nodes */
509         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
510                 int i;
511
512                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
513                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
514                                                 nodes, i,
515                                                 CONTROL_TIMEOUT(),
516                                                 false, tdb_null,
517                                                 NULL,
518                                                 set_recmode_fail_callback,
519                                                 rec) != 0) {
520                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
521                                 talloc_free(tmp_ctx);
522                                 return -1;
523                         }
524                 }
525         }
526
527         talloc_free(tmp_ctx);
528         return 0;
529 }
530
531 /*
532   change recovery master on all node
533  */
534 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn)
535 {
536         TDB_DATA data;
537         TALLOC_CTX *tmp_ctx;
538         uint32_t *nodes;
539
540         tmp_ctx = talloc_new(ctdb);
541         CTDB_NO_MEMORY(ctdb, tmp_ctx);
542
543         data.dsize = sizeof(uint32_t);
544         data.dptr = (unsigned char *)&pnn;
545
546         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
547         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
548                                         nodes, 0,
549                                         CONTROL_TIMEOUT(), false, data,
550                                         NULL, NULL,
551                                         NULL) != 0) {
552                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
553                 talloc_free(tmp_ctx);
554                 return -1;
555         }
556
557         talloc_free(tmp_ctx);
558         return 0;
559 }
560
561 /* update all remote nodes to use the same db priority that we have
562    this can fail if the remove node has not yet been upgraded to 
563    support this function, so we always return success and never fail
564    a recovery if this call fails.
565 */
566 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
567         struct ctdb_node_map_old *nodemap, 
568         uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
569 {
570         int db;
571
572         /* step through all local databases */
573         for (db=0; db<dbmap->num;db++) {
574                 struct ctdb_db_priority db_prio;
575                 int ret;
576
577                 db_prio.db_id     = dbmap->dbs[db].db_id;
578                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
579                 if (ret != 0) {
580                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
581                         continue;
582                 }
583
584                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
585
586                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
587                                                 CTDB_CURRENT_NODE, &db_prio);
588                 if (ret != 0) {
589                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
590                                          db_prio.db_id));
591                 }
592         }
593
594         return 0;
595 }                       
596
597 /*
598   ensure all other nodes have attached to any databases that we have
599  */
600 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
601                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
602 {
603         int i, j, db, ret;
604         struct ctdb_dbid_map_old *remote_dbmap;
605
606         /* verify that all other nodes have all our databases */
607         for (j=0; j<nodemap->num; j++) {
608                 /* we dont need to ourself ourselves */
609                 if (nodemap->nodes[j].pnn == pnn) {
610                         continue;
611                 }
612                 /* dont check nodes that are unavailable */
613                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
614                         continue;
615                 }
616
617                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
618                                          mem_ctx, &remote_dbmap);
619                 if (ret != 0) {
620                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
621                         return -1;
622                 }
623
624                 /* step through all local databases */
625                 for (db=0; db<dbmap->num;db++) {
626                         const char *name;
627
628
629                         for (i=0;i<remote_dbmap->num;i++) {
630                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
631                                         break;
632                                 }
633                         }
634                         /* the remote node already have this database */
635                         if (i!=remote_dbmap->num) {
636                                 continue;
637                         }
638                         /* ok so we need to create this database */
639                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
640                                                   dbmap->dbs[db].db_id, mem_ctx,
641                                                   &name);
642                         if (ret != 0) {
643                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
644                                 return -1;
645                         }
646                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
647                                                  nodemap->nodes[j].pnn,
648                                                  mem_ctx, name,
649                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
650                         if (ret != 0) {
651                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
652                                 return -1;
653                         }
654                 }
655         }
656
657         return 0;
658 }
659
660
661 /*
662   ensure we are attached to any databases that anyone else is attached to
663  */
664 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
665                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
666 {
667         int i, j, db, ret;
668         struct ctdb_dbid_map_old *remote_dbmap;
669
670         /* verify that we have all database any other node has */
671         for (j=0; j<nodemap->num; j++) {
672                 /* we dont need to ourself ourselves */
673                 if (nodemap->nodes[j].pnn == pnn) {
674                         continue;
675                 }
676                 /* dont check nodes that are unavailable */
677                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
678                         continue;
679                 }
680
681                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
682                                          mem_ctx, &remote_dbmap);
683                 if (ret != 0) {
684                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
685                         return -1;
686                 }
687
688                 /* step through all databases on the remote node */
689                 for (db=0; db<remote_dbmap->num;db++) {
690                         const char *name;
691
692                         for (i=0;i<(*dbmap)->num;i++) {
693                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
694                                         break;
695                                 }
696                         }
697                         /* we already have this db locally */
698                         if (i!=(*dbmap)->num) {
699                                 continue;
700                         }
701                         /* ok so we need to create this database and
702                            rebuild dbmap
703                          */
704                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
705                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
706                         if (ret != 0) {
707                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
708                                           nodemap->nodes[j].pnn));
709                                 return -1;
710                         }
711                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
712                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
713                         if (ret != 0) {
714                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
715                                 return -1;
716                         }
717                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
718                         if (ret != 0) {
719                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
720                                 return -1;
721                         }
722                 }
723         }
724
725         return 0;
726 }
727
728
729 /*
730   pull the remote database contents from one node into the recdb
731  */
732 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
733                                     struct tdb_wrap *recdb, uint32_t dbid)
734 {
735         int ret;
736         TDB_DATA outdata;
737         struct ctdb_marshall_buffer *reply;
738         struct ctdb_rec_data_old *recdata;
739         int i;
740         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
741
742         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
743                                CONTROL_TIMEOUT(), &outdata);
744         if (ret != 0) {
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
746                 talloc_free(tmp_ctx);
747                 return -1;
748         }
749
750         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
751
752         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
753                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
754                 talloc_free(tmp_ctx);
755                 return -1;
756         }
757
758         recdata = (struct ctdb_rec_data_old *)&reply->data[0];
759
760         for (i=0;
761              i<reply->count;
762              recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
763                 TDB_DATA key, data;
764                 struct ctdb_ltdb_header *hdr;
765                 TDB_DATA existing;
766
767                 key.dptr = &recdata->data[0];
768                 key.dsize = recdata->keylen;
769                 data.dptr = &recdata->data[key.dsize];
770                 data.dsize = recdata->datalen;
771
772                 hdr = (struct ctdb_ltdb_header *)data.dptr;
773
774                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
775                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
776                         talloc_free(tmp_ctx);
777                         return -1;
778                 }
779
780                 /* fetch the existing record, if any */
781                 existing = tdb_fetch(recdb->tdb, key);
782
783                 if (existing.dptr != NULL) {
784                         struct ctdb_ltdb_header header;
785                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
786                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
787                                          (unsigned)existing.dsize, srcnode));
788                                 free(existing.dptr);
789                                 talloc_free(tmp_ctx);
790                                 return -1;
791                         }
792                         header = *(struct ctdb_ltdb_header *)existing.dptr;
793                         free(existing.dptr);
794                         if (!(header.rsn < hdr->rsn ||
795                               (header.dmaster != ctdb_get_pnn(ctdb) &&
796                                header.rsn == hdr->rsn))) {
797                                 continue;
798                         }
799                 }
800
801                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
802                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
803                         talloc_free(tmp_ctx);
804                         return -1;
805                 }
806         }
807
808         talloc_free(tmp_ctx);
809
810         return 0;
811 }
812
813
814 struct pull_seqnum_cbdata {
815         int failed;
816         uint32_t pnn;
817         uint64_t seqnum;
818 };
819
820 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
821 {
822         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
823         uint64_t seqnum;
824
825         if (cb_data->failed != 0) {
826                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
827                 return;
828         }
829
830         if (res != 0) {
831                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
832                 cb_data->failed = 1;
833                 return;
834         }
835
836         if (outdata.dsize != sizeof(uint64_t)) {
837                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
838                 cb_data->failed = -1;
839                 return;
840         }
841
842         seqnum = *((uint64_t *)outdata.dptr);
843
844         if (seqnum > cb_data->seqnum ||
845             (cb_data->pnn == -1 && seqnum == 0)) {
846                 cb_data->seqnum = seqnum;
847                 cb_data->pnn = node_pnn;
848         }
849 }
850
851 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
852 {
853         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
854
855         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
856         cb_data->failed = 1;
857 }
858
859 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
860                                 struct ctdb_recoverd *rec, 
861                                 struct ctdb_node_map_old *nodemap, 
862                                 struct tdb_wrap *recdb, uint32_t dbid)
863 {
864         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
865         uint32_t *nodes;
866         TDB_DATA data;
867         uint32_t outdata[2];
868         struct pull_seqnum_cbdata *cb_data;
869
870         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
871
872         outdata[0] = dbid;
873         outdata[1] = 0;
874
875         data.dsize = sizeof(outdata);
876         data.dptr  = (uint8_t *)&outdata[0];
877
878         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
879         if (cb_data == NULL) {
880                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
881                 talloc_free(tmp_ctx);
882                 return -1;
883         }
884
885         cb_data->failed = 0;
886         cb_data->pnn    = -1;
887         cb_data->seqnum = 0;
888         
889         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
890         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
891                                         nodes, 0,
892                                         CONTROL_TIMEOUT(), false, data,
893                                         pull_seqnum_cb,
894                                         pull_seqnum_fail_cb,
895                                         cb_data) != 0) {
896                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
897
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (cb_data->failed != 0) {
903                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
904                 talloc_free(tmp_ctx);
905                 return -1;
906         }
907
908         if (cb_data->pnn == -1) {
909                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
910                 talloc_free(tmp_ctx);
911                 return -1;
912         }
913
914         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
915
916         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
917                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
918                 talloc_free(tmp_ctx);
919                 return -1;
920         }
921
922         talloc_free(tmp_ctx);
923         return 0;
924 }
925
926
927 /*
928   pull all the remote database contents into the recdb
929  */
930 static int pull_remote_database(struct ctdb_context *ctdb,
931                                 struct ctdb_recoverd *rec, 
932                                 struct ctdb_node_map_old *nodemap, 
933                                 struct tdb_wrap *recdb, uint32_t dbid,
934                                 bool persistent)
935 {
936         int j;
937
938         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
939                 int ret;
940                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
941                 if (ret == 0) {
942                         return 0;
943                 }
944         }
945
946         /* pull all records from all other nodes across onto this node
947            (this merges based on rsn)
948         */
949         for (j=0; j<nodemap->num; j++) {
950                 /* dont merge from nodes that are unavailable */
951                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
952                         continue;
953                 }
954                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
955                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
956                                  nodemap->nodes[j].pnn));
957                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
958                         return -1;
959                 }
960         }
961         
962         return 0;
963 }
964
965
966 /*
967   update flags on all active nodes
968  */
969 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
970 {
971         int ret;
972
973         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
974                 if (ret != 0) {
975                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
976                 return -1;
977         }
978
979         return 0;
980 }
981
982 /*
983   ensure all nodes have the same vnnmap we do
984  */
985 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
986                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
987 {
988         int j, ret;
989
990         /* push the new vnn map out to all the nodes */
991         for (j=0; j<nodemap->num; j++) {
992                 /* dont push to nodes that are unavailable */
993                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
994                         continue;
995                 }
996
997                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
998                 if (ret != 0) {
999                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1000                         return -1;
1001                 }
1002         }
1003
1004         return 0;
1005 }
1006
1007
1008 /*
1009   called when a vacuum fetch has completed - just free it and do the next one
1010  */
1011 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
1012 {
1013         talloc_free(state);
1014 }
1015
1016
1017 /**
1018  * Process one elements of the vacuum fetch list:
1019  * Migrate it over to us with the special flag
1020  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
1021  */
1022 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
1023                                      uint32_t pnn,
1024                                      struct ctdb_rec_data_old *r)
1025 {
1026         struct ctdb_client_call_state *state;
1027         TDB_DATA data;
1028         struct ctdb_ltdb_header *hdr;
1029         struct ctdb_call call;
1030
1031         ZERO_STRUCT(call);
1032         call.call_id = CTDB_NULL_FUNC;
1033         call.flags = CTDB_IMMEDIATE_MIGRATION;
1034         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1035
1036         call.key.dptr = &r->data[0];
1037         call.key.dsize = r->keylen;
1038
1039         /* ensure we don't block this daemon - just skip a record if we can't get
1040            the chainlock */
1041         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1042                 return true;
1043         }
1044
1045         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1046         if (data.dptr == NULL) {
1047                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1048                 return true;
1049         }
1050
1051         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1052                 free(data.dptr);
1053                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1054                 return true;
1055         }
1056
1057         hdr = (struct ctdb_ltdb_header *)data.dptr;
1058         if (hdr->dmaster == pnn) {
1059                 /* its already local */
1060                 free(data.dptr);
1061                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1062                 return true;
1063         }
1064
1065         free(data.dptr);
1066
1067         state = ctdb_call_send(ctdb_db, &call);
1068         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1069         if (state == NULL) {
1070                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1071                 return false;
1072         }
1073         state->async.fn = vacuum_fetch_callback;
1074         state->async.private_data = NULL;
1075
1076         return true;
1077 }
1078
1079
1080 /*
1081   handler for vacuum fetch
1082 */
1083 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1084                                  void *private_data)
1085 {
1086         struct ctdb_recoverd *rec = talloc_get_type(
1087                 private_data, struct ctdb_recoverd);
1088         struct ctdb_context *ctdb = rec->ctdb;
1089         struct ctdb_marshall_buffer *recs;
1090         int ret, i;
1091         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1092         const char *name;
1093         struct ctdb_dbid_map_old *dbmap=NULL;
1094         bool persistent = false;
1095         struct ctdb_db_context *ctdb_db;
1096         struct ctdb_rec_data_old *r;
1097
1098         recs = (struct ctdb_marshall_buffer *)data.dptr;
1099
1100         if (recs->count == 0) {
1101                 goto done;
1102         }
1103
1104         /* work out if the database is persistent */
1105         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1106         if (ret != 0) {
1107                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1108                 goto done;
1109         }
1110
1111         for (i=0;i<dbmap->num;i++) {
1112                 if (dbmap->dbs[i].db_id == recs->db_id) {
1113                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1114                         break;
1115                 }
1116         }
1117         if (i == dbmap->num) {
1118                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1119                 goto done;
1120         }
1121
1122         /* find the name of this database */
1123         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1125                 goto done;
1126         }
1127
1128         /* attach to it */
1129         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1130         if (ctdb_db == NULL) {
1131                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1132                 goto done;
1133         }
1134
1135         r = (struct ctdb_rec_data_old *)&recs->data[0];
1136         while (recs->count) {
1137                 bool ok;
1138
1139                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1140                 if (!ok) {
1141                         break;
1142                 }
1143
1144                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
1145                 recs->count--;
1146         }
1147
1148 done:
1149         talloc_free(tmp_ctx);
1150 }
1151
1152
1153 /*
1154  * handler for database detach
1155  */
1156 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1157                                     void *private_data)
1158 {
1159         struct ctdb_recoverd *rec = talloc_get_type(
1160                 private_data, struct ctdb_recoverd);
1161         struct ctdb_context *ctdb = rec->ctdb;
1162         uint32_t db_id;
1163         struct ctdb_db_context *ctdb_db;
1164
1165         if (data.dsize != sizeof(db_id)) {
1166                 return;
1167         }
1168         db_id = *(uint32_t *)data.dptr;
1169
1170         ctdb_db = find_ctdb_db(ctdb, db_id);
1171         if (ctdb_db == NULL) {
1172                 /* database is not attached */
1173                 return;
1174         }
1175
1176         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1177
1178         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1179                              ctdb_db->db_name));
1180         talloc_free(ctdb_db);
1181 }
1182
1183 /*
1184   called when ctdb_wait_timeout should finish
1185  */
1186 static void ctdb_wait_handler(struct tevent_context *ev,
1187                               struct tevent_timer *te,
1188                               struct timeval yt, void *p)
1189 {
1190         uint32_t *timed_out = (uint32_t *)p;
1191         (*timed_out) = 1;
1192 }
1193
1194 /*
1195   wait for a given number of seconds
1196  */
1197 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1198 {
1199         uint32_t timed_out = 0;
1200         time_t usecs = (secs - (time_t)secs) * 1000000;
1201         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
1202                          ctdb_wait_handler, &timed_out);
1203         while (!timed_out) {
1204                 tevent_loop_once(ctdb->ev);
1205         }
1206 }
1207
1208 /*
1209   called when an election times out (ends)
1210  */
1211 static void ctdb_election_timeout(struct tevent_context *ev,
1212                                   struct tevent_timer *te,
1213                                   struct timeval t, void *p)
1214 {
1215         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1216         rec->election_timeout = NULL;
1217         fast_start = false;
1218
1219         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1220 }
1221
1222
1223 /*
1224   wait for an election to finish. It finished election_timeout seconds after
1225   the last election packet is received
1226  */
1227 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1228 {
1229         struct ctdb_context *ctdb = rec->ctdb;
1230         while (rec->election_timeout) {
1231                 tevent_loop_once(ctdb->ev);
1232         }
1233 }
1234
1235 /*
1236   Update our local flags from all remote connected nodes. 
1237   This is only run when we are or we belive we are the recovery master
1238  */
1239 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
1240 {
1241         int j;
1242         struct ctdb_context *ctdb = rec->ctdb;
1243         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1244
1245         /* get the nodemap for all active remote nodes and verify
1246            they are the same as for this node
1247          */
1248         for (j=0; j<nodemap->num; j++) {
1249                 struct ctdb_node_map_old *remote_nodemap=NULL;
1250                 int ret;
1251
1252                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1253                         continue;
1254                 }
1255                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1256                         continue;
1257                 }
1258
1259                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1260                                            mem_ctx, &remote_nodemap);
1261                 if (ret != 0) {
1262                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1263                                   nodemap->nodes[j].pnn));
1264                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1265                         talloc_free(mem_ctx);
1266                         return MONITOR_FAILED;
1267                 }
1268                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1269                         /* We should tell our daemon about this so it
1270                            updates its flags or else we will log the same 
1271                            message again in the next iteration of recovery.
1272                            Since we are the recovery master we can just as
1273                            well update the flags on all nodes.
1274                         */
1275                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1276                         if (ret != 0) {
1277                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1278                                 return -1;
1279                         }
1280
1281                         /* Update our local copy of the flags in the recovery
1282                            daemon.
1283                         */
1284                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1285                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1286                                  nodemap->nodes[j].flags));
1287                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1288                 }
1289                 talloc_free(remote_nodemap);
1290         }
1291         talloc_free(mem_ctx);
1292         return MONITOR_OK;
1293 }
1294
1295
1296 /* Create a new random generation id.
1297    The generation id can not be the INVALID_GENERATION id
1298 */
1299 static uint32_t new_generation(void)
1300 {
1301         uint32_t generation;
1302
1303         while (1) {
1304                 generation = random();
1305
1306                 if (generation != INVALID_GENERATION) {
1307                         break;
1308                 }
1309         }
1310
1311         return generation;
1312 }
1313
1314
1315 /*
1316   create a temporary working database
1317  */
1318 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1319 {
1320         char *name;
1321         struct tdb_wrap *recdb;
1322         unsigned tdb_flags;
1323
1324         /* open up the temporary recovery database */
1325         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1326                                ctdb->db_directory_state,
1327                                ctdb->pnn);
1328         if (name == NULL) {
1329                 return NULL;
1330         }
1331         unlink(name);
1332
1333         tdb_flags = TDB_NOLOCK;
1334         if (ctdb->valgrinding) {
1335                 tdb_flags |= TDB_NOMMAP;
1336         }
1337         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1338
1339         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1340                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1341         if (recdb == NULL) {
1342                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1343         }
1344
1345         talloc_free(name);
1346
1347         return recdb;
1348 }
1349
1350
1351 /* 
1352    a traverse function for pulling all relevant records from recdb
1353  */
1354 struct recdb_data {
1355         struct ctdb_context *ctdb;
1356         struct ctdb_marshall_buffer *recdata;
1357         uint32_t len;
1358         uint32_t allocated_len;
1359         bool failed;
1360         bool persistent;
1361 };
1362
1363 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1364 {
1365         struct recdb_data *params = (struct recdb_data *)p;
1366         struct ctdb_rec_data_old *recdata;
1367         struct ctdb_ltdb_header *hdr;
1368
1369         /*
1370          * skip empty records - but NOT for persistent databases:
1371          *
1372          * The record-by-record mode of recovery deletes empty records.
1373          * For persistent databases, this can lead to data corruption
1374          * by deleting records that should be there:
1375          *
1376          * - Assume the cluster has been running for a while.
1377          *
1378          * - A record R in a persistent database has been created and
1379          *   deleted a couple of times, the last operation being deletion,
1380          *   leaving an empty record with a high RSN, say 10.
1381          *
1382          * - Now a node N is turned off.
1383          *
1384          * - This leaves the local database copy of D on N with the empty
1385          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1386          *   the copy of record R.
1387          *
1388          * - Now the record is created again while node N is turned off.
1389          *   This creates R with RSN = 1 on all nodes except for N.
1390          *
1391          * - Now node N is turned on again. The following recovery will chose
1392          *   the older empty copy of R due to RSN 10 > RSN 1.
1393          *
1394          * ==> Hence the record is gone after the recovery.
1395          *
1396          * On databases like Samba's registry, this can damage the higher-level
1397          * data structures built from the various tdb-level records.
1398          */
1399         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1400                 return 0;
1401         }
1402
1403         /* update the dmaster field to point to us */
1404         hdr = (struct ctdb_ltdb_header *)data.dptr;
1405         if (!params->persistent) {
1406                 hdr->dmaster = params->ctdb->pnn;
1407                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1408         }
1409
1410         /* add the record to the blob ready to send to the nodes */
1411         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1412         if (recdata == NULL) {
1413                 params->failed = true;
1414                 return -1;
1415         }
1416         if (params->len + recdata->length >= params->allocated_len) {
1417                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1418                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1419         }
1420         if (params->recdata == NULL) {
1421                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1422                          recdata->length + params->len));
1423                 params->failed = true;
1424                 return -1;
1425         }
1426         params->recdata->count++;
1427         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1428         params->len += recdata->length;
1429         talloc_free(recdata);
1430
1431         return 0;
1432 }
1433
1434 /*
1435   push the recdb database out to all nodes
1436  */
1437 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1438                                bool persistent,
1439                                struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
1440 {
1441         struct recdb_data params;
1442         struct ctdb_marshall_buffer *recdata;
1443         TDB_DATA outdata;
1444         TALLOC_CTX *tmp_ctx;
1445         uint32_t *nodes;
1446
1447         tmp_ctx = talloc_new(ctdb);
1448         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1449
1450         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1451         CTDB_NO_MEMORY(ctdb, recdata);
1452
1453         recdata->db_id = dbid;
1454
1455         params.ctdb = ctdb;
1456         params.recdata = recdata;
1457         params.len = offsetof(struct ctdb_marshall_buffer, data);
1458         params.allocated_len = params.len;
1459         params.failed = false;
1460         params.persistent = persistent;
1461
1462         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1463                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1464                 talloc_free(params.recdata);
1465                 talloc_free(tmp_ctx);
1466                 return -1;
1467         }
1468
1469         if (params.failed) {
1470                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1471                 talloc_free(params.recdata);
1472                 talloc_free(tmp_ctx);
1473                 return -1;              
1474         }
1475
1476         recdata = params.recdata;
1477
1478         outdata.dptr = (void *)recdata;
1479         outdata.dsize = params.len;
1480
1481         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1482         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1483                                         nodes, 0,
1484                                         CONTROL_TIMEOUT(), false, outdata,
1485                                         NULL, NULL,
1486                                         NULL) != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1488                 talloc_free(recdata);
1489                 talloc_free(tmp_ctx);
1490                 return -1;
1491         }
1492
1493         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1494                   dbid, recdata->count));
1495
1496         talloc_free(recdata);
1497         talloc_free(tmp_ctx);
1498
1499         return 0;
1500 }
1501
1502
1503 /*
1504   go through a full recovery on one database 
1505  */
1506 static int recover_database(struct ctdb_recoverd *rec, 
1507                             TALLOC_CTX *mem_ctx,
1508                             uint32_t dbid,
1509                             bool persistent,
1510                             uint32_t pnn, 
1511                             struct ctdb_node_map_old *nodemap,
1512                             uint32_t transaction_id)
1513 {
1514         struct tdb_wrap *recdb;
1515         int ret;
1516         struct ctdb_context *ctdb = rec->ctdb;
1517         TDB_DATA data;
1518         struct ctdb_control_transdb w;
1519         uint32_t *nodes;
1520
1521         recdb = create_recdb(ctdb, mem_ctx);
1522         if (recdb == NULL) {
1523                 return -1;
1524         }
1525
1526         /* pull all remote databases onto the recdb */
1527         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1528         if (ret != 0) {
1529                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1530                 return -1;
1531         }
1532
1533         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1534
1535         /* wipe all the remote databases. This is safe as we are in a transaction */
1536         w.db_id = dbid;
1537         w.transaction_id = transaction_id;
1538
1539         data.dptr = (void *)&w;
1540         data.dsize = sizeof(w);
1541
1542         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1543         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1544                                         nodes, 0,
1545                                         CONTROL_TIMEOUT(), false, data,
1546                                         NULL, NULL,
1547                                         NULL) != 0) {
1548                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1549                 talloc_free(recdb);
1550                 return -1;
1551         }
1552         
1553         /* push out the correct database. This sets the dmaster and skips 
1554            the empty records */
1555         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1556         if (ret != 0) {
1557                 talloc_free(recdb);
1558                 return -1;
1559         }
1560
1561         /* all done with this database */
1562         talloc_free(recdb);
1563
1564         return 0;
1565 }
1566
1567 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1568                                          struct ctdb_recoverd *rec,
1569                                          struct ctdb_node_map_old *nodemap,
1570                                          uint32_t *culprit)
1571 {
1572         int j;
1573         int ret;
1574
1575         if (ctdb->num_nodes != nodemap->num) {
1576                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1577                                   ctdb->num_nodes, nodemap->num));
1578                 if (culprit) {
1579                         *culprit = ctdb->pnn;
1580                 }
1581                 return -1;
1582         }
1583
1584         for (j=0; j<nodemap->num; j++) {
1585                 /* For readability */
1586                 struct ctdb_node *node = ctdb->nodes[j];
1587
1588                 /* release any existing data */
1589                 if (node->known_public_ips) {
1590                         talloc_free(node->known_public_ips);
1591                         node->known_public_ips = NULL;
1592                 }
1593                 if (node->available_public_ips) {
1594                         talloc_free(node->available_public_ips);
1595                         node->available_public_ips = NULL;
1596                 }
1597
1598                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1599                         continue;
1600                 }
1601
1602                 /* Retrieve the list of known public IPs from the node */
1603                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1604                                         CONTROL_TIMEOUT(),
1605                                         node->pnn,
1606                                         ctdb->nodes,
1607                                         0,
1608                                         &node->known_public_ips);
1609                 if (ret != 0) {
1610                         DEBUG(DEBUG_ERR,
1611                               ("Failed to read known public IPs from node: %u\n",
1612                                node->pnn));
1613                         if (culprit) {
1614                                 *culprit = node->pnn;
1615                         }
1616                         return -1;
1617                 }
1618
1619                 if (ctdb->do_checkpublicip &&
1620                     !ctdb_op_is_disabled(rec->takeover_run) &&
1621                     verify_remote_ip_allocation(ctdb,
1622                                                  node->known_public_ips,
1623                                                  node->pnn)) {
1624                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1625                         rec->need_takeover_run = true;
1626                 }
1627
1628                 /* Retrieve the list of available public IPs from the node */
1629                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1630                                         CONTROL_TIMEOUT(),
1631                                         node->pnn,
1632                                         ctdb->nodes,
1633                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1634                                         &node->available_public_ips);
1635                 if (ret != 0) {
1636                         DEBUG(DEBUG_ERR,
1637                               ("Failed to read available public IPs from node: %u\n",
1638                                node->pnn));
1639                         if (culprit) {
1640                                 *culprit = node->pnn;
1641                         }
1642                         return -1;
1643                 }
1644         }
1645
1646         return 0;
1647 }
1648
1649 /* when we start a recovery, make sure all nodes use the same reclock file
1650    setting
1651 */
1652 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1653 {
1654         struct ctdb_context *ctdb = rec->ctdb;
1655         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1656         TDB_DATA data;
1657         uint32_t *nodes;
1658
1659         if (ctdb->recovery_lock_file == NULL) {
1660                 data.dptr  = NULL;
1661                 data.dsize = 0;
1662         } else {
1663                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1664                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1665         }
1666
1667         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1668         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1669                                         nodes, 0,
1670                                         CONTROL_TIMEOUT(),
1671                                         false, data,
1672                                         NULL, NULL,
1673                                         rec) != 0) {
1674                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1675                 talloc_free(tmp_ctx);
1676                 return -1;
1677         }
1678
1679         talloc_free(tmp_ctx);
1680         return 0;
1681 }
1682
1683
1684 /*
1685  * this callback is called for every node that failed to execute ctdb_takeover_run()
1686  * and set flag to re-run takeover run.
1687  */
1688 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1689 {
1690         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1691
1692         if (callback_data != NULL) {
1693                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1694
1695                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1696
1697                 ctdb_set_culprit(rec, node_pnn);
1698         }
1699 }
1700
1701
1702 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1703 {
1704         struct ctdb_context *ctdb = rec->ctdb;
1705         int i;
1706         struct ctdb_banning_state *ban_state;
1707
1708         *self_ban = false;
1709         for (i=0; i<ctdb->num_nodes; i++) {
1710                 if (ctdb->nodes[i]->ban_state == NULL) {
1711                         continue;
1712                 }
1713                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1714                 if (ban_state->count < 2*ctdb->num_nodes) {
1715                         continue;
1716                 }
1717
1718                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1719                         ctdb->nodes[i]->pnn, ban_state->count,
1720                         ctdb->tunable.recovery_ban_period));
1721                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1722                 ban_state->count = 0;
1723
1724                 /* Banning ourself? */
1725                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1726                         *self_ban = true;
1727                 }
1728         }
1729 }
1730
1731 static bool do_takeover_run(struct ctdb_recoverd *rec,
1732                             struct ctdb_node_map_old *nodemap,
1733                             bool banning_credits_on_fail)
1734 {
1735         uint32_t *nodes = NULL;
1736         struct srvid_request_data dtr;
1737         TDB_DATA data;
1738         int i;
1739         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1740         int ret;
1741         bool ok;
1742
1743         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1744
1745         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1746                 DEBUG(DEBUG_ERR, (__location__
1747                                   " takeover run already in progress \n"));
1748                 ok = false;
1749                 goto done;
1750         }
1751
1752         if (!ctdb_op_begin(rec->takeover_run)) {
1753                 ok = false;
1754                 goto done;
1755         }
1756
1757         /* Disable IP checks (takeover runs, really) on other nodes
1758          * while doing this takeover run.  This will stop those other
1759          * nodes from triggering takeover runs when think they should
1760          * be hosting an IP but it isn't yet on an interface.  Don't
1761          * wait for replies since a failure here might cause some
1762          * noise in the logs but will not actually cause a problem.
1763          */
1764         dtr.srvid = 0; /* No reply */
1765         dtr.pnn = -1;
1766
1767         data.dptr  = (uint8_t*)&dtr;
1768         data.dsize = sizeof(dtr);
1769
1770         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1771
1772         /* Disable for 60 seconds.  This can be a tunable later if
1773          * necessary.
1774          */
1775         dtr.data = 60;
1776         for (i = 0; i < talloc_array_length(nodes); i++) {
1777                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1778                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1779                                              data) != 0) {
1780                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1781                 }
1782         }
1783
1784         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1785                                 rec->force_rebalance_nodes,
1786                                 takeover_fail_callback,
1787                                 banning_credits_on_fail ? rec : NULL);
1788
1789         /* Reenable takeover runs and IP checks on other nodes */
1790         dtr.data = 0;
1791         for (i = 0; i < talloc_array_length(nodes); i++) {
1792                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1793                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1794                                              data) != 0) {
1795                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1796                 }
1797         }
1798
1799         if (ret != 0) {
1800                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1801                 ok = false;
1802                 goto done;
1803         }
1804
1805         ok = true;
1806         /* Takeover run was successful so clear force rebalance targets */
1807         if (rebalance_nodes == rec->force_rebalance_nodes) {
1808                 TALLOC_FREE(rec->force_rebalance_nodes);
1809         } else {
1810                 DEBUG(DEBUG_WARNING,
1811                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1812         }
1813 done:
1814         rec->need_takeover_run = !ok;
1815         talloc_free(nodes);
1816         ctdb_op_end(rec->takeover_run);
1817
1818         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1819         return ok;
1820 }
1821
1822 struct recovery_helper_state {
1823         int fd[2];
1824         pid_t pid;
1825         int result;
1826         bool done;
1827 };
1828
1829 static void ctdb_recovery_handler(struct tevent_context *ev,
1830                                   struct tevent_fd *fde,
1831                                   uint16_t flags, void *private_data)
1832 {
1833         struct recovery_helper_state *state = talloc_get_type_abort(
1834                 private_data, struct recovery_helper_state);
1835         int ret;
1836
1837         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1838         if (ret != sizeof(state->result)) {
1839                 state->result = EPIPE;
1840         }
1841
1842         state->done = true;
1843 }
1844
1845
1846 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1847 {
1848         static char prog[PATH_MAX+1] = "";
1849         const char **args;
1850         struct recovery_helper_state *state;
1851         struct tevent_fd *fde;
1852         int nargs, ret;
1853
1854         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1855                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1856                              "ctdb_recovery_helper")) {
1857                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1858         }
1859
1860         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1861         if (state == NULL) {
1862                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1863                 return -1;
1864         }
1865
1866         state->pid = -1;
1867
1868         ret = pipe(state->fd);
1869         if (ret != 0) {
1870                 DEBUG(DEBUG_ERR,
1871                       ("Failed to create pipe for recovery helper\n"));
1872                 goto fail;
1873         }
1874
1875         set_close_on_exec(state->fd[0]);
1876
1877         nargs = 4;
1878         args = talloc_array(state, const char *, nargs);
1879         if (args == NULL) {
1880                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1881                 goto fail;
1882         }
1883
1884         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1885         args[1] = rec->ctdb->daemon.name;
1886         args[2] = talloc_asprintf(args, "%u", new_generation());
1887         args[3] = NULL;
1888
1889         if (args[0] == NULL || args[2] == NULL) {
1890                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1891                 goto fail;
1892         }
1893
1894         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1895                                      args, NULL, NULL, &state->pid)) {
1896                 DEBUG(DEBUG_ERR,
1897                       ("Failed to create child for recovery helper\n"));
1898                 goto fail;
1899         }
1900
1901         close(state->fd[1]);
1902         state->fd[1] = -1;
1903
1904         state->done = false;
1905
1906         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1907                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1908         if (fde == NULL) {
1909                 goto fail;
1910         }
1911         tevent_fd_set_auto_close(fde);
1912
1913         while (!state->done) {
1914                 tevent_loop_once(rec->ctdb->ev);
1915         }
1916
1917         close(state->fd[0]);
1918         state->fd[0] = -1;
1919
1920         if (state->result != 0) {
1921                 goto fail;
1922         }
1923
1924         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1925         talloc_free(state);
1926         return 0;
1927
1928 fail:
1929         if (state->fd[0] != -1) {
1930                 close(state->fd[0]);
1931         }
1932         if (state->fd[1] != -1) {
1933                 close(state->fd[1]);
1934         }
1935         if (state->pid != -1) {
1936                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1937         }
1938         talloc_free(state);
1939         return -1;
1940 }
1941
1942 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1943                               uint32_t pnn, struct ctdb_node_map_old *nodemap,
1944                               struct ctdb_vnn_map *vnnmap,
1945                               struct ctdb_dbid_map_old *dbmap)
1946 {
1947         struct ctdb_context *ctdb = rec->ctdb;
1948         uint32_t generation;
1949         TDB_DATA data;
1950         uint32_t *nodes;
1951         int ret, i, j;
1952
1953         /* set recovery mode to active on all nodes */
1954         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1955         if (ret != 0) {
1956                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1957                 return -1;
1958         }
1959
1960         /* execute the "startrecovery" event script on all nodes */
1961         ret = run_startrecovery_eventscript(rec, nodemap);
1962         if (ret!=0) {
1963                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1964                 return -1;
1965         }
1966
1967         /* pick a new generation number */
1968         generation = new_generation();
1969
1970         /* change the vnnmap on this node to use the new generation 
1971            number but not on any other nodes.
1972            this guarantees that if we abort the recovery prematurely
1973            for some reason (a node stops responding?)
1974            that we can just return immediately and we will reenter
1975            recovery shortly again.
1976            I.e. we deliberately leave the cluster with an inconsistent
1977            generation id to allow us to abort recovery at any stage and
1978            just restart it from scratch.
1979          */
1980         vnnmap->generation = generation;
1981         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1982         if (ret != 0) {
1983                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1984                 return -1;
1985         }
1986
1987         /* Database generations are updated when the transaction is commited to
1988          * the databases.  So make sure to use the final generation as the
1989          * transaction id
1990          */
1991         generation = new_generation();
1992
1993         data.dptr = (void *)&generation;
1994         data.dsize = sizeof(uint32_t);
1995
1996         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1997         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1998                                         nodes, 0,
1999                                         CONTROL_TIMEOUT(), false, data,
2000                                         NULL,
2001                                         transaction_start_fail_callback,
2002                                         rec) != 0) {
2003                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
2004                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
2005                                         nodes, 0,
2006                                         CONTROL_TIMEOUT(), false, tdb_null,
2007                                         NULL,
2008                                         NULL,
2009                                         NULL) != 0) {
2010                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
2011                 }
2012                 return -1;
2013         }
2014
2015         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
2016
2017         for (i=0;i<dbmap->num;i++) {
2018                 ret = recover_database(rec, mem_ctx,
2019                                        dbmap->dbs[i].db_id,
2020                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
2021                                        pnn, nodemap, generation);
2022                 if (ret != 0) {
2023                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
2024                         return -1;
2025                 }
2026         }
2027
2028         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
2029
2030         /* commit all the changes */
2031         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
2032                                         nodes, 0,
2033                                         CONTROL_TIMEOUT(), false, data,
2034                                         NULL, NULL,
2035                                         NULL) != 0) {
2036                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
2037                 return -1;
2038         }
2039
2040         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
2041
2042         /* build a new vnn map with all the currently active and
2043            unbanned nodes */
2044         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
2045         CTDB_NO_MEMORY(ctdb, vnnmap);
2046         vnnmap->generation = generation;
2047         vnnmap->size = 0;
2048         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
2049         CTDB_NO_MEMORY(ctdb, vnnmap->map);
2050         for (i=j=0;i<nodemap->num;i++) {
2051                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2052                         continue;
2053                 }
2054                 if (!ctdb_node_has_capabilities(rec->caps,
2055                                                 ctdb->nodes[i]->pnn,
2056                                                 CTDB_CAP_LMASTER)) {
2057                         /* this node can not be an lmaster */
2058                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
2059                         continue;
2060                 }
2061
2062                 vnnmap->size++;
2063                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2064                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2065                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
2066
2067         }
2068         if (vnnmap->size == 0) {
2069                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2070                 vnnmap->size++;
2071                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2072                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2073                 vnnmap->map[0] = pnn;
2074         }
2075
2076         /* update to the new vnnmap on all nodes */
2077         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2078         if (ret != 0) {
2079                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2080                 return -1;
2081         }
2082
2083         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2084
2085         /* update recmaster to point to us for all nodes */
2086         ret = set_recovery_master(ctdb, nodemap, pnn);
2087         if (ret!=0) {
2088                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2089                 return -1;
2090         }
2091
2092         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2093
2094         /* disable recovery mode */
2095         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
2096         if (ret != 0) {
2097                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2098                 return -1;
2099         }
2100
2101         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2102
2103         return 0;
2104 }
2105
2106 /*
2107   we are the recmaster, and recovery is needed - start a recovery run
2108  */
2109 static int do_recovery(struct ctdb_recoverd *rec,
2110                        TALLOC_CTX *mem_ctx, uint32_t pnn,
2111                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
2112 {
2113         struct ctdb_context *ctdb = rec->ctdb;
2114         int i, ret;
2115         struct ctdb_dbid_map_old *dbmap;
2116         struct timeval start_time;
2117         uint32_t culprit = (uint32_t)-1;
2118         bool self_ban;
2119         bool par_recovery;
2120
2121         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2122
2123         /* Check if the current node is still the recmaster.  It's possible that
2124          * re-election has changed the recmaster, but we have not yet updated
2125          * that information.
2126          */
2127         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2128                                      pnn, &ctdb->recovery_master);
2129         if (ret != 0) {
2130                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster\n"));
2131                 return -1;
2132         }
2133
2134         if (pnn != ctdb->recovery_master) {
2135                 DEBUG(DEBUG_NOTICE,
2136                       ("Recovery master changed to %u, aborting recovery\n",
2137                        ctdb->recovery_master));
2138                 return -1;
2139         }
2140
2141         /* if recovery fails, force it again */
2142         rec->need_recovery = true;
2143
2144         if (!ctdb_op_begin(rec->recovery)) {
2145                 return -1;
2146         }
2147
2148         if (rec->election_timeout) {
2149                 /* an election is in progress */
2150                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2151                 goto fail;
2152         }
2153
2154         ban_misbehaving_nodes(rec, &self_ban);
2155         if (self_ban) {
2156                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2157                 goto fail;
2158         }
2159
2160         if (ctdb->recovery_lock_file != NULL) {
2161                 if (ctdb_recovery_have_lock(ctdb)) {
2162                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2163                 } else {
2164                         start_time = timeval_current();
2165                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2166                                              ctdb->recovery_lock_file));
2167                         if (!ctdb_recovery_lock(ctdb)) {
2168                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2169                                         /* If ctdb is trying first recovery, it's
2170                                          * possible that current node does not know
2171                                          * yet who the recmaster is.
2172                                          */
2173                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2174                                                           " - retrying recovery\n"));
2175                                         goto fail;
2176                                 }
2177
2178                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2179                                                  "and ban ourself for %u seconds\n",
2180                                                  ctdb->tunable.recovery_ban_period));
2181                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2182                                 goto fail;
2183                         }
2184                         ctdb_ctrl_report_recd_lock_latency(ctdb,
2185                                                            CONTROL_TIMEOUT(),
2186                                                            timeval_elapsed(&start_time));
2187                         DEBUG(DEBUG_NOTICE,
2188                               ("Recovery lock taken successfully by recovery daemon\n"));
2189                 }
2190         }
2191
2192         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2193
2194         /* get a list of all databases */
2195         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2196         if (ret != 0) {
2197                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2198                 goto fail;
2199         }
2200
2201         /* we do the db creation before we set the recovery mode, so the freeze happens
2202            on all databases we will be dealing with. */
2203
2204         /* verify that we have all the databases any other node has */
2205         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2206         if (ret != 0) {
2207                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2208                 goto fail;
2209         }
2210
2211         /* verify that all other nodes have all our databases */
2212         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2213         if (ret != 0) {
2214                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2215                 goto fail;
2216         }
2217         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2218
2219         /* update the database priority for all remote databases */
2220         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2221         if (ret != 0) {
2222                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2223         }
2224         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2225
2226
2227         /* update all other nodes to use the same setting for reclock files
2228            as the local recovery master.
2229         */
2230         sync_recovery_lock_file_across_cluster(rec);
2231
2232         /* update the capabilities for all nodes */
2233         ret = update_capabilities(rec, nodemap);
2234         if (ret!=0) {
2235                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2236                 return -1;
2237         }
2238
2239         /*
2240           update all nodes to have the same flags that we have
2241          */
2242         for (i=0;i<nodemap->num;i++) {
2243                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2244                         continue;
2245                 }
2246
2247                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2248                 if (ret != 0) {
2249                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2250                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2251                         } else {
2252                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2253                                 return -1;
2254                         }
2255                 }
2256         }
2257
2258         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2259
2260         /* Check if all participating nodes have parallel recovery capability */
2261         par_recovery = true;
2262         for (i=0; i<nodemap->num; i++) {
2263                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2264                         continue;
2265                 }
2266
2267                 if (!(rec->caps[i].capabilities &
2268                       CTDB_CAP_PARALLEL_RECOVERY)) {
2269                         par_recovery = false;
2270                         break;
2271                 }
2272         }
2273
2274         if (par_recovery) {
2275                 ret = db_recovery_parallel(rec, mem_ctx);
2276         } else {
2277                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2278                                          dbmap);
2279         }
2280
2281         if (ret != 0) {
2282                 goto fail;
2283         }
2284
2285         /* Fetch known/available public IPs from each active node */
2286         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2287         if (ret != 0) {
2288                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2289                                  culprit));
2290                 rec->need_takeover_run = true;
2291                 goto fail;
2292         }
2293
2294         do_takeover_run(rec, nodemap, false);
2295
2296         /* execute the "recovered" event script on all nodes */
2297         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2298         if (ret!=0) {
2299                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2300                 goto fail;
2301         }
2302
2303         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2304
2305         /* send a message to all clients telling them that the cluster 
2306            has been reconfigured */
2307         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2308                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2309         if (ret != 0) {
2310                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2311                 goto fail;
2312         }
2313
2314         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2315
2316         rec->need_recovery = false;
2317         ctdb_op_end(rec->recovery);
2318
2319         /* we managed to complete a full recovery, make sure to forgive
2320            any past sins by the nodes that could now participate in the
2321            recovery.
2322         */
2323         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2324         for (i=0;i<nodemap->num;i++) {
2325                 struct ctdb_banning_state *ban_state;
2326
2327                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2328                         continue;
2329                 }
2330
2331                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2332                 if (ban_state == NULL) {
2333                         continue;
2334                 }
2335
2336                 ban_state->count = 0;
2337         }
2338
2339         /* We just finished a recovery successfully.
2340            We now wait for rerecovery_timeout before we allow
2341            another recovery to take place.
2342         */
2343         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2344         ctdb_op_disable(rec->recovery, ctdb->ev,
2345                         ctdb->tunable.rerecovery_timeout);
2346         return 0;
2347
2348 fail:
2349         ctdb_op_end(rec->recovery);
2350         return -1;
2351 }
2352
2353
2354 /*
2355   elections are won by first checking the number of connected nodes, then
2356   the priority time, then the pnn
2357  */
2358 struct election_message {
2359         uint32_t num_connected;
2360         struct timeval priority_time;
2361         uint32_t pnn;
2362         uint32_t node_flags;
2363 };
2364
2365 /*
2366   form this nodes election data
2367  */
2368 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2369 {
2370         int ret, i;
2371         struct ctdb_node_map_old *nodemap;
2372         struct ctdb_context *ctdb = rec->ctdb;
2373
2374         ZERO_STRUCTP(em);
2375
2376         em->pnn = rec->ctdb->pnn;
2377         em->priority_time = rec->priority_time;
2378
2379         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2380         if (ret != 0) {
2381                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2382                 return;
2383         }
2384
2385         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2386         em->node_flags = rec->node_flags;
2387
2388         for (i=0;i<nodemap->num;i++) {
2389                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2390                         em->num_connected++;
2391                 }
2392         }
2393
2394         /* we shouldnt try to win this election if we cant be a recmaster */
2395         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2396                 em->num_connected = 0;
2397                 em->priority_time = timeval_current();
2398         }
2399
2400         talloc_free(nodemap);
2401 }
2402
2403 /*
2404   see if the given election data wins
2405  */
2406 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2407 {
2408         struct election_message myem;
2409         int cmp = 0;
2410
2411         ctdb_election_data(rec, &myem);
2412
2413         /* we cant win if we dont have the recmaster capability */
2414         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2415                 return false;
2416         }
2417
2418         /* we cant win if we are banned */
2419         if (rec->node_flags & NODE_FLAGS_BANNED) {
2420                 return false;
2421         }
2422
2423         /* we cant win if we are stopped */
2424         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2425                 return false;
2426         }
2427
2428         /* we will automatically win if the other node is banned */
2429         if (em->node_flags & NODE_FLAGS_BANNED) {
2430                 return true;
2431         }
2432
2433         /* we will automatically win if the other node is banned */
2434         if (em->node_flags & NODE_FLAGS_STOPPED) {
2435                 return true;
2436         }
2437
2438         /* then the longest running node */
2439         if (cmp == 0) {
2440                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2441         }
2442
2443         if (cmp == 0) {
2444                 cmp = (int)myem.pnn - (int)em->pnn;
2445         }
2446
2447         return cmp > 0;
2448 }
2449
2450 /*
2451   send out an election request
2452  */
2453 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2454 {
2455         int ret;
2456         TDB_DATA election_data;
2457         struct election_message emsg;
2458         uint64_t srvid;
2459         struct ctdb_context *ctdb = rec->ctdb;
2460
2461         srvid = CTDB_SRVID_RECOVERY;
2462
2463         ctdb_election_data(rec, &emsg);
2464
2465         election_data.dsize = sizeof(struct election_message);
2466         election_data.dptr  = (unsigned char *)&emsg;
2467
2468
2469         /* first we assume we will win the election and set 
2470            recoverymaster to be ourself on the current node
2471          */
2472         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2473         if (ret != 0) {
2474                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2475                 return -1;
2476         }
2477
2478
2479         /* send an election message to all active nodes */
2480         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2481         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2482 }
2483
2484 /*
2485   this function will unban all nodes in the cluster
2486 */
2487 static void unban_all_nodes(struct ctdb_context *ctdb)
2488 {
2489         int ret, i;
2490         struct ctdb_node_map_old *nodemap;
2491         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2492         
2493         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2494         if (ret != 0) {
2495                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2496                 return;
2497         }
2498
2499         for (i=0;i<nodemap->num;i++) {
2500                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2501                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2502                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2503                                                  nodemap->nodes[i].pnn, 0,
2504                                                  NODE_FLAGS_BANNED);
2505                         if (ret != 0) {
2506                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2507                         }
2508                 }
2509         }
2510
2511         talloc_free(tmp_ctx);
2512 }
2513
2514
2515 /*
2516   we think we are winning the election - send a broadcast election request
2517  */
2518 static void election_send_request(struct tevent_context *ev,
2519                                   struct tevent_timer *te,
2520                                   struct timeval t, void *p)
2521 {
2522         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2523         int ret;
2524
2525         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2526         if (ret != 0) {
2527                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2528         }
2529
2530         talloc_free(rec->send_election_te);
2531         rec->send_election_te = NULL;
2532 }
2533
2534 /*
2535   handler for memory dumps
2536 */
2537 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2538 {
2539         struct ctdb_recoverd *rec = talloc_get_type(
2540                 private_data, struct ctdb_recoverd);
2541         struct ctdb_context *ctdb = rec->ctdb;
2542         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2543         TDB_DATA *dump;
2544         int ret;
2545         struct srvid_request *rd;
2546
2547         if (data.dsize != sizeof(struct srvid_request)) {
2548                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2549                 talloc_free(tmp_ctx);
2550                 return;
2551         }
2552         rd = (struct srvid_request *)data.dptr;
2553
2554         dump = talloc_zero(tmp_ctx, TDB_DATA);
2555         if (dump == NULL) {
2556                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2557                 talloc_free(tmp_ctx);
2558                 return;
2559         }
2560         ret = ctdb_dump_memory(ctdb, dump);
2561         if (ret != 0) {
2562                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2563                 talloc_free(tmp_ctx);
2564                 return;
2565         }
2566
2567 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2568
2569         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2570         if (ret != 0) {
2571                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2572                 talloc_free(tmp_ctx);
2573                 return;
2574         }
2575
2576         talloc_free(tmp_ctx);
2577 }
2578
2579 /*
2580   handler for reload_nodes
2581 */
2582 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2583                                  void *private_data)
2584 {
2585         struct ctdb_recoverd *rec = talloc_get_type(
2586                 private_data, struct ctdb_recoverd);
2587
2588         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2589
2590         ctdb_load_nodes_file(rec->ctdb);
2591 }
2592
2593
2594 static void ctdb_rebalance_timeout(struct tevent_context *ev,
2595                                    struct tevent_timer *te,
2596                                    struct timeval t, void *p)
2597 {
2598         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2599
2600         if (rec->force_rebalance_nodes == NULL) {
2601                 DEBUG(DEBUG_ERR,
2602                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2603                 return;
2604         }
2605
2606         DEBUG(DEBUG_NOTICE,
2607               ("Rebalance timeout occurred - do takeover run\n"));
2608         do_takeover_run(rec, rec->nodemap, false);
2609 }
2610
2611
2612 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2613                                         void *private_data)
2614 {
2615         struct ctdb_recoverd *rec = talloc_get_type(
2616                 private_data, struct ctdb_recoverd);
2617         struct ctdb_context *ctdb = rec->ctdb;
2618         uint32_t pnn;
2619         uint32_t *t;
2620         int len;
2621         uint32_t deferred_rebalance;
2622
2623         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2624                 return;
2625         }
2626
2627         if (data.dsize != sizeof(uint32_t)) {
2628                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2629                 return;
2630         }
2631
2632         pnn = *(uint32_t *)&data.dptr[0];
2633
2634         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2635
2636         /* Copy any existing list of nodes.  There's probably some
2637          * sort of realloc variant that will do this but we need to
2638          * make sure that freeing the old array also cancels the timer
2639          * event for the timeout... not sure if realloc will do that.
2640          */
2641         len = (rec->force_rebalance_nodes != NULL) ?
2642                 talloc_array_length(rec->force_rebalance_nodes) :
2643                 0;
2644
2645         /* This allows duplicates to be added but they don't cause
2646          * harm.  A call to add a duplicate PNN arguably means that
2647          * the timeout should be reset, so this is the simplest
2648          * solution.
2649          */
2650         t = talloc_zero_array(rec, uint32_t, len+1);
2651         CTDB_NO_MEMORY_VOID(ctdb, t);
2652         if (len > 0) {
2653                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2654         }
2655         t[len] = pnn;
2656
2657         talloc_free(rec->force_rebalance_nodes);
2658
2659         rec->force_rebalance_nodes = t;
2660
2661         /* If configured, setup a deferred takeover run to make sure
2662          * that certain nodes get IPs rebalanced to them.  This will
2663          * be cancelled if a successful takeover run happens before
2664          * the timeout.  Assign tunable value to variable for
2665          * readability.
2666          */
2667         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2668         if (deferred_rebalance != 0) {
2669                 tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
2670                                  timeval_current_ofs(deferred_rebalance, 0),
2671                                  ctdb_rebalance_timeout, rec);
2672         }
2673 }
2674
2675
2676
2677 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2678                                    void *private_data)
2679 {
2680         struct ctdb_recoverd *rec = talloc_get_type(
2681                 private_data, struct ctdb_recoverd);
2682         struct ctdb_public_ip *ip;
2683
2684         if (rec->recmaster != rec->ctdb->pnn) {
2685                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2686                 return;
2687         }
2688
2689         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2690                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2691                 return;
2692         }
2693
2694         ip = (struct ctdb_public_ip *)data.dptr;
2695
2696         update_ip_assignment_tree(rec->ctdb, ip);
2697 }
2698
2699 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2700                                     TDB_DATA data,
2701                                     struct ctdb_op_state *op_state)
2702 {
2703         struct srvid_request_data *r;
2704         uint32_t timeout;
2705         TDB_DATA result;
2706         int32_t ret = 0;
2707
2708         /* Validate input data */
2709         if (data.dsize != sizeof(struct srvid_request_data)) {
2710                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2711                                  "expecting %lu\n", (long unsigned)data.dsize,
2712                                  (long unsigned)sizeof(struct srvid_request)));
2713                 return;
2714         }
2715         if (data.dptr == NULL) {
2716                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2717                 return;
2718         }
2719
2720         r = (struct srvid_request_data *)data.dptr;
2721         timeout = r->data;
2722
2723         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2724         if (ret != 0) {
2725                 goto done;
2726         }
2727
2728         /* Returning our PNN tells the caller that we succeeded */
2729         ret = ctdb_get_pnn(ctdb);
2730 done:
2731         result.dsize = sizeof(int32_t);
2732         result.dptr  = (uint8_t *)&ret;
2733         srvid_request_reply(ctdb, (struct srvid_request *)r, result);
2734 }
2735
2736 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2737                                           void *private_data)
2738 {
2739         struct ctdb_recoverd *rec = talloc_get_type(
2740                 private_data, struct ctdb_recoverd);
2741
2742         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2743 }
2744
2745 /* Backward compatibility for this SRVID */
2746 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2747                                      void *private_data)
2748 {
2749         struct ctdb_recoverd *rec = talloc_get_type(
2750                 private_data, struct ctdb_recoverd);
2751         uint32_t timeout;
2752
2753         if (data.dsize != sizeof(uint32_t)) {
2754                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2755                                  "expecting %lu\n", (long unsigned)data.dsize,
2756                                  (long unsigned)sizeof(uint32_t)));
2757                 return;
2758         }
2759         if (data.dptr == NULL) {
2760                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2761                 return;
2762         }
2763
2764         timeout = *((uint32_t *)data.dptr);
2765
2766         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2767 }
2768
2769 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2770                                        void *private_data)
2771 {
2772         struct ctdb_recoverd *rec = talloc_get_type(
2773                 private_data, struct ctdb_recoverd);
2774
2775         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2776 }
2777
2778 /*
2779   handler for ip reallocate, just add it to the list of requests and 
2780   handle this later in the monitor_cluster loop so we do not recurse
2781   with other requests to takeover_run()
2782 */
2783 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2784                                   void *private_data)
2785 {
2786         struct srvid_request *request;
2787         struct ctdb_recoverd *rec = talloc_get_type(
2788                 private_data, struct ctdb_recoverd);
2789
2790         if (data.dsize != sizeof(struct srvid_request)) {
2791                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2792                 return;
2793         }
2794
2795         request = (struct srvid_request *)data.dptr;
2796
2797         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2798 }
2799
2800 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2801                                           struct ctdb_recoverd *rec)
2802 {
2803         TDB_DATA result;
2804         int32_t ret;
2805         uint32_t culprit;
2806         struct srvid_requests *current;
2807
2808         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2809
2810         /* Only process requests that are currently pending.  More
2811          * might come in while the takeover run is in progress and
2812          * they will need to be processed later since they might
2813          * be in response flag changes.
2814          */
2815         current = rec->reallocate_requests;
2816         rec->reallocate_requests = NULL;
2817
2818         /* update the list of public ips that a node can handle for
2819            all connected nodes
2820         */
2821         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2822         if (ret != 0) {
2823                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2824                                  culprit));
2825                 rec->need_takeover_run = true;
2826         }
2827         if (ret == 0) {
2828                 if (do_takeover_run(rec, rec->nodemap, false)) {
2829                         ret = ctdb_get_pnn(ctdb);
2830                 } else {
2831                         ret = -1;
2832                 }
2833         }
2834
2835         result.dsize = sizeof(int32_t);
2836         result.dptr  = (uint8_t *)&ret;
2837
2838         srvid_requests_reply(ctdb, &current, result);
2839 }
2840
2841
2842 /*
2843   handler for recovery master elections
2844 */
2845 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2846 {
2847         struct ctdb_recoverd *rec = talloc_get_type(
2848                 private_data, struct ctdb_recoverd);
2849         struct ctdb_context *ctdb = rec->ctdb;
2850         int ret;
2851         struct election_message *em = (struct election_message *)data.dptr;
2852
2853         /* Ignore election packets from ourself */
2854         if (ctdb->pnn == em->pnn) {
2855                 return;
2856         }
2857
2858         /* we got an election packet - update the timeout for the election */
2859         talloc_free(rec->election_timeout);
2860         rec->election_timeout = tevent_add_timer(
2861                         ctdb->ev, ctdb,
2862                         fast_start ?
2863                                 timeval_current_ofs(0, 500000) :
2864                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2865                         ctdb_election_timeout, rec);
2866
2867         /* someone called an election. check their election data
2868            and if we disagree and we would rather be the elected node, 
2869            send a new election message to all other nodes
2870          */
2871         if (ctdb_election_win(rec, em)) {
2872                 if (!rec->send_election_te) {
2873                         rec->send_election_te = tevent_add_timer(
2874                                         ctdb->ev, rec,
2875                                         timeval_current_ofs(0, 500000),
2876                                         election_send_request, rec);
2877                 }
2878                 /*unban_all_nodes(ctdb);*/
2879                 return;
2880         }
2881
2882         /* we didn't win */
2883         TALLOC_FREE(rec->send_election_te);
2884
2885         /* Release the recovery lock file */
2886         if (ctdb_recovery_have_lock(ctdb)) {
2887                 ctdb_recovery_unlock(ctdb);
2888                 unban_all_nodes(ctdb);
2889         }
2890
2891         clear_ip_assignment_tree(ctdb);
2892
2893         /* ok, let that guy become recmaster then */
2894         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2895         if (ret != 0) {
2896                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2897                 return;
2898         }
2899
2900         return;
2901 }
2902
2903
2904 /*
2905   force the start of the election process
2906  */
2907 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2908                            struct ctdb_node_map_old *nodemap)
2909 {
2910         int ret;
2911         struct ctdb_context *ctdb = rec->ctdb;
2912
2913         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2914
2915         /* set all nodes to recovery mode to stop all internode traffic */
2916         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2917         if (ret != 0) {
2918                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2919                 return;
2920         }
2921
2922         talloc_free(rec->election_timeout);
2923         rec->election_timeout = tevent_add_timer(
2924                         ctdb->ev, ctdb,
2925                         fast_start ?
2926                                 timeval_current_ofs(0, 500000) :
2927                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2928                         ctdb_election_timeout, rec);
2929
2930         ret = send_election_request(rec, pnn);
2931         if (ret!=0) {
2932                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2933                 return;
2934         }
2935
2936         /* wait for a few seconds to collect all responses */
2937         ctdb_wait_election(rec);
2938 }
2939
2940
2941
2942 /*
2943   handler for when a node changes its flags
2944 */
2945 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2946 {
2947         struct ctdb_recoverd *rec = talloc_get_type(
2948                 private_data, struct ctdb_recoverd);
2949         struct ctdb_context *ctdb = rec->ctdb;
2950         int ret;
2951         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2952         struct ctdb_node_map_old *nodemap=NULL;
2953         TALLOC_CTX *tmp_ctx;
2954         int i;
2955         int disabled_flag_changed;
2956
2957         if (data.dsize != sizeof(*c)) {
2958                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2959                 return;
2960         }
2961
2962         tmp_ctx = talloc_new(ctdb);
2963         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2964
2965         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2966         if (ret != 0) {
2967                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2968                 talloc_free(tmp_ctx);
2969                 return;         
2970         }
2971
2972
2973         for (i=0;i<nodemap->num;i++) {
2974                 if (nodemap->nodes[i].pnn == c->pnn) break;
2975         }
2976
2977         if (i == nodemap->num) {
2978                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2979                 talloc_free(tmp_ctx);
2980                 return;
2981         }
2982
2983         if (c->old_flags != c->new_flags) {
2984                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2985         }
2986
2987         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2988
2989         nodemap->nodes[i].flags = c->new_flags;
2990
2991         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2992                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2993
2994         if (ret == 0) {
2995                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2996                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2997         }
2998         
2999         if (ret == 0 &&
3000             ctdb->recovery_master == ctdb->pnn &&
3001             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3002                 /* Only do the takeover run if the perm disabled or unhealthy
3003                    flags changed since these will cause an ip failover but not
3004                    a recovery.
3005                    If the node became disconnected or banned this will also
3006                    lead to an ip address failover but that is handled 
3007                    during recovery
3008                 */
3009                 if (disabled_flag_changed) {
3010                         rec->need_takeover_run = true;
3011                 }
3012         }
3013
3014         talloc_free(tmp_ctx);
3015 }
3016
3017 /*
3018   handler for when we need to push out flag changes ot all other nodes
3019 */
3020 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
3021                                void *private_data)
3022 {
3023         struct ctdb_recoverd *rec = talloc_get_type(
3024                 private_data, struct ctdb_recoverd);
3025         struct ctdb_context *ctdb = rec->ctdb;
3026         int ret;
3027         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
3028         struct ctdb_node_map_old *nodemap=NULL;
3029         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3030         uint32_t recmaster;
3031         uint32_t *nodes;
3032
3033         /* find the recovery master */
3034         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
3035         if (ret != 0) {
3036                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
3037                 talloc_free(tmp_ctx);
3038                 return;
3039         }
3040
3041         /* read the node flags from the recmaster */
3042         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
3043         if (ret != 0) {
3044                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
3045                 talloc_free(tmp_ctx);
3046                 return;
3047         }
3048         if (c->pnn >= nodemap->num) {
3049                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
3050                 talloc_free(tmp_ctx);
3051                 return;
3052         }
3053
3054         /* send the flags update to all connected nodes */
3055         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3056
3057         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3058                                       nodes, 0, CONTROL_TIMEOUT(),
3059                                       false, data,
3060                                       NULL, NULL,
3061                                       NULL) != 0) {
3062                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
3063
3064                 talloc_free(tmp_ctx);
3065                 return;
3066         }
3067
3068         talloc_free(tmp_ctx);
3069 }
3070
3071
3072 struct verify_recmode_normal_data {
3073         uint32_t count;
3074         enum monitor_result status;
3075 };
3076
3077 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
3078 {
3079         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
3080
3081
3082         /* one more node has responded with recmode data*/
3083         rmdata->count--;
3084
3085         /* if we failed to get the recmode, then return an error and let
3086            the main loop try again.
3087         */
3088         if (state->state != CTDB_CONTROL_DONE) {
3089                 if (rmdata->status == MONITOR_OK) {
3090                         rmdata->status = MONITOR_FAILED;
3091                 }
3092                 return;
3093         }
3094
3095         /* if we got a response, then the recmode will be stored in the
3096            status field
3097         */
3098         if (state->status != CTDB_RECOVERY_NORMAL) {
3099                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
3100                 rmdata->status = MONITOR_RECOVERY_NEEDED;
3101         }
3102
3103         return;
3104 }
3105
3106
3107 /* verify that all nodes are in normal recovery mode */
3108 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
3109 {
3110         struct verify_recmode_normal_data *rmdata;
3111         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3112         struct ctdb_client_control_state *state;
3113         enum monitor_result status;
3114         int j;
3115         
3116         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
3117         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3118         rmdata->count  = 0;
3119         rmdata->status = MONITOR_OK;
3120
3121         /* loop over all active nodes and send an async getrecmode call to 
3122            them*/
3123         for (j=0; j<nodemap->num; j++) {
3124                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3125                         continue;
3126                 }
3127                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
3128                                         CONTROL_TIMEOUT(), 
3129                                         nodemap->nodes[j].pnn);
3130                 if (state == NULL) {
3131                         /* we failed to send the control, treat this as 
3132                            an error and try again next iteration
3133                         */                      
3134                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
3135                         talloc_free(mem_ctx);
3136                         return MONITOR_FAILED;
3137                 }
3138
3139                 /* set up the callback functions */
3140                 state->async.fn = verify_recmode_normal_callback;
3141                 state->async.private_data = rmdata;
3142
3143                 /* one more control to wait for to complete */
3144                 rmdata->count++;
3145         }
3146
3147
3148         /* now wait for up to the maximum number of seconds allowed
3149            or until all nodes we expect a response from has replied
3150         */
3151         while (rmdata->count > 0) {
3152                 tevent_loop_once(ctdb->ev);
3153         }
3154
3155         status = rmdata->status;
3156         talloc_free(mem_ctx);
3157         return status;
3158 }
3159
3160
3161 struct verify_recmaster_data {
3162         struct ctdb_recoverd *rec;
3163         uint32_t count;
3164         uint32_t pnn;
3165         enum monitor_result status;
3166 };
3167
3168 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3169 {
3170         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3171
3172
3173         /* one more node has responded with recmaster data*/
3174         rmdata->count--;
3175
3176         /* if we failed to get the recmaster, then return an error and let
3177            the main loop try again.
3178         */
3179         if (state->state != CTDB_CONTROL_DONE) {
3180                 if (rmdata->status == MONITOR_OK) {
3181                         rmdata->status = MONITOR_FAILED;
3182                 }
3183                 return;
3184         }
3185
3186         /* if we got a response, then the recmaster will be stored in the
3187            status field
3188         */
3189         if (state->status != rmdata->pnn) {
3190                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3191                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3192                 rmdata->status = MONITOR_ELECTION_NEEDED;
3193         }
3194
3195         return;
3196 }
3197
3198
3199 /* verify that all nodes agree that we are the recmaster */
3200 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
3201 {
3202         struct ctdb_context *ctdb = rec->ctdb;
3203         struct verify_recmaster_data *rmdata;
3204         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3205         struct ctdb_client_control_state *state;
3206         enum monitor_result status;
3207         int j;
3208         
3209         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3210         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3211         rmdata->rec    = rec;
3212         rmdata->count  = 0;
3213         rmdata->pnn    = pnn;
3214         rmdata->status = MONITOR_OK;
3215
3216         /* loop over all active nodes and send an async getrecmaster call to 
3217            them*/
3218         for (j=0; j<nodemap->num; j++) {
3219                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3220                         continue;
3221                 }
3222                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3223                                         CONTROL_TIMEOUT(),
3224                                         nodemap->nodes[j].pnn);
3225                 if (state == NULL) {
3226                         /* we failed to send the control, treat this as 
3227                            an error and try again next iteration
3228                         */                      
3229                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3230                         talloc_free(mem_ctx);
3231                         return MONITOR_FAILED;
3232                 }
3233
3234                 /* set up the callback functions */
3235                 state->async.fn = verify_recmaster_callback;
3236                 state->async.private_data = rmdata;
3237
3238                 /* one more control to wait for to complete */
3239                 rmdata->count++;
3240         }
3241
3242
3243         /* now wait for up to the maximum number of seconds allowed
3244            or until all nodes we expect a response from has replied
3245         */
3246         while (rmdata->count > 0) {
3247                 tevent_loop_once(ctdb->ev);
3248         }
3249
3250         status = rmdata->status;
3251         talloc_free(mem_ctx);
3252         return status;
3253 }
3254
3255 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3256                                     struct ctdb_recoverd *rec)
3257 {
3258         struct ctdb_control_get_ifaces *ifaces = NULL;
3259         TALLOC_CTX *mem_ctx;
3260         bool ret = false;
3261
3262         mem_ctx = talloc_new(NULL);
3263
3264         /* Read the interfaces from the local node */
3265         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3266                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3267                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3268                 /* We could return an error.  However, this will be
3269                  * rare so we'll decide that the interfaces have
3270                  * actually changed, just in case.
3271                  */
3272                 talloc_free(mem_ctx);
3273                 return true;
3274         }
3275
3276         if (!rec->ifaces) {
3277                 /* We haven't been here before so things have changed */
3278                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3279                 ret = true;
3280         } else if (rec->ifaces->num != ifaces->num) {
3281                 /* Number of interfaces has changed */
3282                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3283                                      rec->ifaces->num, ifaces->num));
3284                 ret = true;
3285         } else {
3286                 /* See if interface names or link states have changed */
3287                 int i;
3288                 for (i = 0; i < rec->ifaces->num; i++) {
3289                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3290                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3291                                 DEBUG(DEBUG_NOTICE,
3292                                       ("Interface in slot %d changed: %s => %s\n",
3293                                        i, iface->name, ifaces->ifaces[i].name));
3294                                 ret = true;
3295                                 break;
3296                         }
3297                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3298                                 DEBUG(DEBUG_NOTICE,
3299                                       ("Interface %s changed state: %d => %d\n",
3300                                        iface->name, iface->link_state,
3301                                        ifaces->ifaces[i].link_state));
3302                                 ret = true;
3303                                 break;
3304                         }
3305                 }
3306         }
3307
3308         talloc_free(rec->ifaces);
3309         rec->ifaces = talloc_steal(rec, ifaces);
3310
3311         talloc_free(mem_ctx);
3312         return ret;
3313 }
3314
3315 /* called to check that the local allocation of public ip addresses is ok.
3316 */
3317 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
3318 {
3319         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3320         struct ctdb_uptime *uptime1 = NULL;
3321         struct ctdb_uptime *uptime2 = NULL;
3322         int ret, j;
3323         bool need_takeover_run = false;
3324
3325         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3326                                 CTDB_CURRENT_NODE, &uptime1);
3327         if (ret != 0) {
3328                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3329                 talloc_free(mem_ctx);
3330                 return -1;
3331         }
3332
3333         if (interfaces_have_changed(ctdb, rec)) {
3334                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3335                                      "local node %u - force takeover run\n",
3336                                      pnn));
3337                 need_takeover_run = true;
3338         }
3339
3340         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3341                                 CTDB_CURRENT_NODE, &uptime2);
3342         if (ret != 0) {
3343                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3344                 talloc_free(mem_ctx);
3345                 return -1;
3346         }
3347
3348         /* skip the check if the startrecovery time has changed */
3349         if (timeval_compare(&uptime1->last_recovery_started,
3350                             &uptime2->last_recovery_started) != 0) {
3351                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3352                 talloc_free(mem_ctx);
3353                 return 0;
3354         }
3355
3356         /* skip the check if the endrecovery time has changed */
3357         if (timeval_compare(&uptime1->last_recovery_finished,
3358                             &uptime2->last_recovery_finished) != 0) {
3359                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3360                 talloc_free(mem_ctx);
3361                 return 0;
3362         }
3363
3364         /* skip the check if we have started but not finished recovery */
3365         if (timeval_compare(&uptime1->last_recovery_finished,
3366                             &uptime1->last_recovery_started) != 1) {
3367                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3368                 talloc_free(mem_ctx);
3369
3370                 return 0;
3371         }
3372
3373         /* verify that we have the ip addresses we should have
3374            and we dont have ones we shouldnt have.
3375            if we find an inconsistency we set recmode to
3376            active on the local node and wait for the recmaster
3377            to do a full blown recovery.
3378            also if the pnn is -1 and we are healthy and can host the ip
3379            we also request a ip reallocation.
3380         */
3381         if (ctdb->tunable.disable_ip_failover == 0) {
3382                 struct ctdb_public_ip_list_old *ips = NULL;
3383
3384                 /* read the *available* IPs from the local node */
3385                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3386                 if (ret != 0) {
3387                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3388                         talloc_free(mem_ctx);
3389                         return -1;
3390                 }
3391
3392                 for (j=0; j<ips->num; j++) {
3393                         if (ips->ips[j].pnn == -1 &&
3394                             nodemap->nodes[pnn].flags == 0) {
3395                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3396                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3397                                 need_takeover_run = true;
3398                         }
3399                 }
3400
3401                 talloc_free(ips);
3402
3403                 /* read the *known* IPs from the local node */
3404                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3405                 if (ret != 0) {
3406                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3407                         talloc_free(mem_ctx);
3408                         return -1;
3409                 }
3410
3411                 for (j=0; j<ips->num; j++) {
3412                         if (ips->ips[j].pnn == pnn) {
3413                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3414                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3415                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3416                                         need_takeover_run = true;
3417                                 }
3418                         } else {
3419                                 if (ctdb->do_checkpublicip &&
3420                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3421
3422                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3423                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3424
3425                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3426                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3427                                         }
3428                                 }
3429                         }
3430                 }
3431         }
3432
3433         if (need_takeover_run) {
3434                 struct srvid_request rd;
3435                 TDB_DATA data;
3436
3437                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3438
3439                 rd.pnn = ctdb->pnn;
3440                 rd.srvid = 0;
3441                 data.dptr = (uint8_t *)&rd;
3442                 data.dsize = sizeof(rd);
3443
3444                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3445                 if (ret != 0) {
3446                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3447                 }
3448         }
3449         talloc_free(mem_ctx);
3450         return 0;
3451 }
3452
3453
3454 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3455 {
3456         struct ctdb_node_map_old **remote_nodemaps = callback_data;
3457
3458         if (node_pnn >= ctdb->num_nodes) {
3459                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3460                 return;
3461         }
3462
3463         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
3464
3465 }
3466
3467 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3468         struct ctdb_node_map_old *nodemap,
3469         struct ctdb_node_map_old **remote_nodemaps)
3470 {
3471         uint32_t *nodes;
3472
3473         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3474         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3475                                         nodes, 0,
3476                                         CONTROL_TIMEOUT(), false, tdb_null,
3477                                         async_getnodemap_callback,
3478                                         NULL,
3479                                         remote_nodemaps) != 0) {
3480                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3481
3482                 return -1;
3483         }
3484
3485         return 0;
3486 }
3487
3488 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3489 {
3490         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3491         const char *reclockfile;
3492
3493         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3494                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3495                 talloc_free(tmp_ctx);
3496                 return -1;      
3497         }
3498
3499         if (reclockfile == NULL) {
3500                 if (ctdb->recovery_lock_file != NULL) {
3501                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3502                         talloc_free(ctdb->recovery_lock_file);
3503                         ctdb->recovery_lock_file = NULL;
3504                         ctdb_recovery_unlock(ctdb);
3505                 }
3506                 talloc_free(tmp_ctx);
3507                 return 0;
3508         }
3509
3510         if (ctdb->recovery_lock_file == NULL) {
3511                 DEBUG(DEBUG_NOTICE,
3512                       ("Recovery lock file enabled (%s)\n", reclockfile));
3513                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3514                 ctdb_recovery_unlock(ctdb);
3515                 talloc_free(tmp_ctx);
3516                 return 0;
3517         }
3518
3519
3520         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3521                 talloc_free(tmp_ctx);
3522                 return 0;
3523         }
3524
3525         DEBUG(DEBUG_NOTICE,
3526               ("Recovery lock file changed (now %s)\n", reclockfile));
3527         talloc_free(ctdb->recovery_lock_file);
3528         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3529         ctdb_recovery_unlock(ctdb);
3530
3531         talloc_free(tmp_ctx);
3532         return 0;
3533 }
3534
3535 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3536                       TALLOC_CTX *mem_ctx)
3537 {
3538         uint32_t pnn;
3539         struct ctdb_node_map_old *nodemap=NULL;
3540         struct ctdb_node_map_old *recmaster_nodemap=NULL;
3541         struct ctdb_node_map_old **remote_nodemaps=NULL;
3542         struct ctdb_vnn_map *vnnmap=NULL;
3543         struct ctdb_vnn_map *remote_vnnmap=NULL;
3544         uint32_t num_lmasters;
3545         int32_t debug_level;
3546         int i, j, ret;
3547         bool self_ban;
3548
3549
3550         /* verify that the main daemon is still running */
3551         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3552                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3553                 exit(-1);
3554         }
3555
3556         /* ping the local daemon to tell it we are alive */
3557         ctdb_ctrl_recd_ping(ctdb);
3558
3559         if (rec->election_timeout) {
3560                 /* an election is in progress */
3561                 return;
3562         }
3563
3564         /* read the debug level from the parent and update locally */
3565         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3566         if (ret !=0) {
3567                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3568                 return;
3569         }
3570         DEBUGLEVEL = debug_level;
3571
3572         /* get relevant tunables */
3573         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3574         if (ret != 0) {
3575                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3576                 return;
3577         }
3578
3579         /* get runstate */
3580         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3581                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3582         if (ret != 0) {
3583                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3584                 return;
3585         }
3586
3587         /* get the current recovery lock file from the server */
3588         if (update_recovery_lock_file(ctdb) != 0) {
3589                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3590                 return;
3591         }
3592
3593         /* Make sure that if recovery lock verification becomes disabled when
3594            we close the file
3595         */
3596         if (ctdb->recovery_lock_file == NULL) {
3597                 ctdb_recovery_unlock(ctdb);
3598         }
3599
3600         pnn = ctdb_get_pnn(ctdb);
3601
3602         /* get the vnnmap */
3603         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3604         if (ret != 0) {
3605                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3606                 return;
3607         }
3608
3609
3610         /* get number of nodes */
3611         if (rec->nodemap) {
3612                 talloc_free(rec->nodemap);
3613                 rec->nodemap = NULL;
3614                 nodemap=NULL;
3615         }
3616         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3617         if (ret != 0) {
3618                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3619                 return;
3620         }
3621         nodemap = rec->nodemap;
3622
3623         /* remember our own node flags */
3624         rec->node_flags = nodemap->nodes[pnn].flags;
3625
3626         ban_misbehaving_nodes(rec, &self_ban);
3627         if (self_ban) {
3628                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3629                 return;
3630         }
3631
3632         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3633            also frozen and that the recmode is set to active.
3634         */
3635         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3636                 /* If this node has become inactive then we want to
3637                  * reduce the chances of it taking over the recovery
3638                  * master role when it becomes active again.  This
3639                  * helps to stabilise the recovery master role so that
3640                  * it stays on the most stable node.
3641                  */
3642                 rec->priority_time = timeval_current();
3643
3644                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3645                 if (ret != 0) {
3646                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3647                 }
3648                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3649                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3650
3651                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3652                         if (ret != 0) {
3653                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3654
3655                                 return;
3656                         }
3657                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3658                         if (ret != 0) {
3659                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3660                                 return;
3661                         }
3662                 }
3663
3664                 /* If this node is stopped or banned then it is not the recovery
3665                  * master, so don't do anything. This prevents stopped or banned
3666                  * node from starting election and sending unnecessary controls.
3667                  */
3668                 return;
3669         }
3670
3671         /* check which node is the recovery master */
3672         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3673         if (ret != 0) {
3674                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3675                 return;
3676         }
3677
3678         /* If we are not the recmaster then do some housekeeping */
3679         if (rec->recmaster != pnn) {
3680                 /* Ignore any IP reallocate requests - only recmaster
3681                  * processes them
3682                  */
3683                 TALLOC_FREE(rec->reallocate_requests);
3684                 /* Clear any nodes that should be force rebalanced in
3685                  * the next takeover run.  If the recovery master role
3686                  * has moved then we don't want to process these some
3687                  * time in the future.
3688                  */
3689                 TALLOC_FREE(rec->force_rebalance_nodes);
3690         }
3691
3692         /* This is a special case.  When recovery daemon is started, recmaster
3693          * is set to -1.  If a node is not started in stopped state, then
3694          * start election to decide recovery master
3695          */
3696         if (rec->recmaster == (uint32_t)-1) {
3697                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3698                 force_election(rec, pnn, nodemap);
3699                 return;
3700         }
3701
3702         /* update the capabilities for all nodes */
3703         ret = update_capabilities(rec, nodemap);
3704         if (ret != 0) {
3705                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3706                 return;
3707         }
3708
3709         /*
3710          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3711          * but we have, then force an election and try to become the new
3712          * recmaster.
3713          */
3714         if (!ctdb_node_has_capabilities(rec->caps,
3715                                         rec->recmaster,
3716                                         CTDB_CAP_RECMASTER) &&
3717             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3718             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3719                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3720                                   " but we (node %u) have - force an election\n",
3721                                   rec->recmaster, pnn));
3722                 force_election(rec, pnn, nodemap);
3723                 return;
3724         }
3725
3726         /* verify that the recmaster node is still active */
3727         for (j=0; j<nodemap->num; j++) {
3728                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3729                         break;
3730                 }
3731         }
3732
3733         if (j == nodemap->num) {
3734                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3735                 force_election(rec, pnn, nodemap);
3736                 return;
3737         }
3738
3739         /* if recovery master is disconnected we must elect a new recmaster */
3740         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3741                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3742                 force_election(rec, pnn, nodemap);
3743                 return;
3744         }
3745
3746         /* get nodemap from the recovery master to check if it is inactive */
3747         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3748                                    mem_ctx, &recmaster_nodemap);
3749         if (ret != 0) {
3750                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3751                           nodemap->nodes[j].pnn));
3752                 return;
3753         }
3754
3755
3756         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3757             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3758                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3759                 /*
3760                  * update our nodemap to carry the recmaster's notion of
3761                  * its own flags, so that we don't keep freezing the
3762                  * inactive recmaster node...
3763                  */
3764                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3765                 force_election(rec, pnn, nodemap);
3766                 return;
3767         }
3768
3769         /* verify that we have all ip addresses we should have and we dont
3770          * have addresses we shouldnt have.
3771          */ 
3772         if (ctdb->tunable.disable_ip_failover == 0 &&
3773             !ctdb_op_is_disabled(rec->takeover_run)) {
3774                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3775                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3776                 }
3777         }
3778
3779
3780         /* if we are not the recmaster then we do not need to check
3781            if recovery is needed
3782          */
3783         if (pnn != rec->recmaster) {
3784                 return;
3785         }
3786
3787
3788         /* ensure our local copies of flags are right */
3789         ret = update_local_flags(rec, nodemap);
3790         if (ret == MONITOR_ELECTION_NEEDED) {
3791                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3792                 force_election(rec, pnn, nodemap);
3793                 return;
3794         }
3795         if (ret != MONITOR_OK) {
3796                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3797                 return;
3798         }
3799
3800         if (ctdb->num_nodes != nodemap->num) {
3801                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3802                 ctdb_load_nodes_file(ctdb);
3803                 return;
3804         }
3805
3806         /* verify that all active nodes agree that we are the recmaster */
3807         switch (verify_recmaster(rec, nodemap, pnn)) {
3808         case MONITOR_RECOVERY_NEEDED:
3809                 /* can not happen */
3810                 return;
3811         case MONITOR_ELECTION_NEEDED:
3812                 force_election(rec, pnn, nodemap);
3813                 return;
3814         case MONITOR_OK:
3815                 break;
3816         case MONITOR_FAILED:
3817                 return;
3818         }
3819
3820
3821         if (rec->need_recovery) {
3822                 /* a previous recovery didn't finish */
3823                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3824                 return;
3825         }
3826
3827         /* verify that all active nodes are in normal mode 
3828            and not in recovery mode 
3829         */
3830         switch (verify_recmode(ctdb, nodemap)) {
3831         case MONITOR_RECOVERY_NEEDED:
3832                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3833                 return;
3834         case MONITOR_FAILED:
3835                 return;
3836         case MONITOR_ELECTION_NEEDED:
3837                 /* can not happen */
3838         case MONITOR_OK:
3839                 break;
3840         }
3841
3842
3843         if (ctdb->recovery_lock_file != NULL) {
3844                 /* We must already hold the recovery lock */
3845                 if (!ctdb_recovery_have_lock(ctdb)) {
3846                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3847                         ctdb_set_culprit(rec, ctdb->pnn);
3848                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3849                         return;
3850                 }
3851         }
3852
3853
3854         /* if there are takeovers requested, perform it and notify the waiters */
3855         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3856             rec->reallocate_requests) {
3857                 process_ipreallocate_requests(ctdb, rec);
3858         }
3859
3860         /* If recoveries are disabled then there is no use doing any
3861          * nodemap or flags checks.  Recoveries might be disabled due
3862          * to "reloadnodes", so doing these checks might cause an
3863          * unnecessary recovery.  */
3864         if (ctdb_op_is_disabled(rec->recovery)) {
3865                 return;
3866         }
3867
3868         /* get the nodemap for all active remote nodes
3869          */
3870         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
3871         if (remote_nodemaps == NULL) {
3872                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3873                 return;
3874         }
3875         for(i=0; i<nodemap->num; i++) {
3876                 remote_nodemaps[i] = NULL;
3877         }
3878         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3879                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3880                 return;
3881         } 
3882
3883         /* verify that all other nodes have the same nodemap as we have
3884         */
3885         for (j=0; j<nodemap->num; j++) {
3886                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3887                         continue;
3888                 }
3889
3890                 if (remote_nodemaps[j] == NULL) {
3891                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3892                         ctdb_set_culprit(rec, j);
3893
3894                         return;
3895                 }
3896
3897                 /* if the nodes disagree on how many nodes there are
3898                    then this is a good reason to try recovery
3899                  */
3900                 if (remote_nodemaps[j]->num != nodemap->num) {
3901                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3902                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3903                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3904                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3905                         return;
3906                 }
3907
3908                 /* if the nodes disagree on which nodes exist and are
3909                    active, then that is also a good reason to do recovery
3910                  */
3911                 for (i=0;i<nodemap->num;i++) {
3912                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3913                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3914                                           nodemap->nodes[j].pnn, i, 
3915                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3916                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3917                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3918                                             vnnmap);
3919                                 return;
3920                         }
3921                 }
3922         }
3923
3924         /*
3925          * Update node flags obtained from each active node. This ensure we have
3926          * up-to-date information for all the nodes.
3927          */
3928         for (j=0; j<nodemap->num; j++) {
3929                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3930                         continue;
3931                 }
3932                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3933         }
3934
3935         for (j=0; j<nodemap->num; j++) {
3936                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3937                         continue;
3938                 }
3939
3940                 /* verify the flags are consistent
3941                 */
3942                 for (i=0; i<nodemap->num; i++) {
3943                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3944                                 continue;
3945                         }
3946                         
3947                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3948                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3949                                   nodemap->nodes[j].pnn, 
3950                                   nodemap->nodes[i].pnn, 
3951                                   remote_nodemaps[j]->nodes[i].flags,
3952                                   nodemap->nodes[i].flags));
3953                                 if (i == j) {
3954                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3955                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3956                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3957                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3958                                                     vnnmap);
3959                                         return;
3960                                 } else {
3961                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3962                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3963                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3964                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3965                                                     vnnmap);
3966                                         return;
3967                                 }
3968                         }
3969                 }
3970         }
3971
3972
3973         /* count how many active nodes there are */
3974         num_lmasters  = 0;
3975         for (i=0; i<nodemap->num; i++) {
3976                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3977                         if (ctdb_node_has_capabilities(rec->caps,
3978                                                        ctdb->nodes[i]->pnn,
3979                                                        CTDB_CAP_LMASTER)) {
3980                                 num_lmasters++;
3981                         }
3982                 }
3983         }
3984
3985
3986         /* There must be the same number of lmasters in the vnn map as
3987          * there are active nodes with the lmaster capability...  or
3988          * do a recovery.
3989          */
3990         if (vnnmap->size != num_lmasters) {
3991                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3992                           vnnmap->size, num_lmasters));
3993                 ctdb_set_culprit(rec, ctdb->pnn);
3994                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3995                 return;
3996         }
3997
3998         /* verify that all active nodes in the nodemap also exist in 
3999            the vnnmap.
4000          */
4001         for (j=0; j<nodemap->num; j++) {
4002                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
4003                         continue;
4004                 }
4005                 if (nodemap->nodes[j].pnn == pnn) {
4006                         continue;
4007                 }
4008
4009                 for (i=0; i<vnnmap->size; i++) {
4010                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
4011                                 break;
4012                         }
4013                 }
4014                 if (i == vnnmap->size) {
4015                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
4016                                   nodemap->nodes[j].pnn));
4017                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4018                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4019                         return;
4020                 }
4021         }
4022
4023         
4024         /* verify that all other nodes have the same vnnmap
4025            and are from the same generation
4026          */
4027         for (j=0; j<nodemap->num; j++) {
4028                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
4029                         continue;
4030                 }
4031                 if (nodemap->nodes[j].pnn == pnn) {
4032                         continue;
4033                 }
4034
4035                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
4036                                           mem_ctx, &remote_vnnmap);
4037                 if (ret != 0) {
4038                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
4039                                   nodemap->nodes[j].pnn));
4040                         return;
4041                 }
4042
4043                 /* verify the vnnmap generation is the same */
4044                 if (vnnmap->generation != remote_vnnmap->generation) {
4045                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
4046                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
4047                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4048                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4049                         return;
4050                 }
4051
4052                 /* verify the vnnmap size is the same */
4053                 if (vnnmap->size != remote_vnnmap->size) {
4054                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4055                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4056                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4057                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4058                         return;
4059                 }
4060
4061                 /* verify the vnnmap is the same */
4062                 for (i=0;i<vnnmap->size;i++) {
4063                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4064                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4065                                           nodemap->nodes[j].pnn));
4066                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4067                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4068                                             vnnmap);
4069                                 return;
4070                         }
4071                 }
4072         }
4073
4074         /* we might need to change who has what IP assigned */
4075         if (rec->need_takeover_run) {
4076                 uint32_t culprit = (uint32_t)-1;
4077
4078                 rec->need_takeover_run = false;
4079
4080                 /* update the list of public ips that a node can handle for
4081                    all connected nodes
4082                 */
4083                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4084                 if (ret != 0) {
4085                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4086                                          culprit));
4087                         rec->need_takeover_run = true;
4088                         return;
4089                 }
4090
4091                 /* execute the "startrecovery" event script on all nodes */
4092                 ret = run_startrecovery_eventscript(rec, nodemap);
4093                 if (ret!=0) {
4094                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4095                         ctdb_set_culprit(rec, ctdb->pnn);
4096                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4097                         return;
4098                 }
4099
4100                 /* If takeover run fails, then the offending nodes are
4101                  * assigned ban culprit counts. And we re-try takeover.
4102                  * If takeover run fails repeatedly, the node would get
4103                  * banned.
4104                  *
4105                  * If rec->need_takeover_run is not set to true at this
4106                  * failure, monitoring is disabled cluster-wide (via
4107                  * startrecovery eventscript) and will not get enabled.
4108                  */
4109                 if (!do_takeover_run(rec, nodemap, true)) {
4110                         return;
4111                 }
4112
4113                 /* execute the "recovered" event script on all nodes */
4114                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4115 #if 0
4116 // we cant check whether the event completed successfully
4117 // since this script WILL fail if the node is in recovery mode
4118 // and if that race happens, the code here would just cause a second
4119 // cascading recovery.
4120                 if (ret!=0) {
4121                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4122                         ctdb_set_culprit(rec, ctdb->pnn);
4123                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4124                 }
4125 #endif
4126         }
4127 }
4128
4129 /*
4130   the main monitoring loop
4131  */
4132 static void monitor_cluster(struct ctdb_context *ctdb)
4133 {
4134         struct ctdb_recoverd *rec;
4135
4136         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4137
4138         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4139         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4140
4141         rec->ctdb = ctdb;
4142
4143         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
4144         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
4145
4146         rec->recovery = ctdb_op_init(rec, "recoveries");
4147         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
4148
4149         rec->priority_time = timeval_current();
4150
4151         /* register a message port for sending memory dumps */
4152         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4153
4154         /* register a message port for recovery elections */
4155         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4156
4157         /* when nodes are disabled/enabled */
4158         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4159
4160         /* when we are asked to puch out a flag change */
4161         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4162
4163         /* register a message port for vacuum fetch */
4164         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4165
4166         /* register a message port for reloadnodes  */
4167         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4168
4169         /* register a message port for performing a takeover run */
4170         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4171
4172         /* register a message port for disabling the ip check for a short while */
4173         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4174
4175         /* register a message port for updating the recovery daemons node assignment for an ip */
4176         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4177
4178         /* register a message port for forcing a rebalance of a node next
4179            reallocation */
4180         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4181
4182         /* Register a message port for disabling takeover runs */
4183         ctdb_client_set_message_handler(ctdb,
4184                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4185                                         disable_takeover_runs_handler, rec);
4186
4187         /* Register a message port for disabling recoveries */
4188         ctdb_client_set_message_handler(ctdb,
4189                                         CTDB_SRVID_DISABLE_RECOVERIES,
4190                                         disable_recoveries_handler, rec);
4191
4192         /* register a message port for detaching database */
4193         ctdb_client_set_message_handler(ctdb,
4194                                         CTDB_SRVID_DETACH_DATABASE,
4195                                         detach_database_handler, rec);
4196
4197         for (;;) {
4198                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4199                 struct timeval start;
4200                 double elapsed;
4201
4202                 if (!mem_ctx) {
4203                         DEBUG(DEBUG_CRIT,(__location__
4204                                           " Failed to create temp context\n"));
4205                         exit(-1);
4206                 }
4207
4208                 start = timeval_current();
4209                 main_loop(ctdb, rec, mem_ctx);
4210                 talloc_free(mem_ctx);
4211
4212                 /* we only check for recovery once every second */
4213                 elapsed = timeval_elapsed(&start);
4214                 if (elapsed < ctdb->tunable.recover_interval) {
4215                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4216                                           - elapsed);
4217                 }
4218         }
4219 }
4220
4221 /*
4222   event handler for when the main ctdbd dies
4223  */
4224 static void ctdb_recoverd_parent(struct tevent_context *ev,
4225                                  struct tevent_fd *fde,
4226                                  uint16_t flags, void *private_data)
4227 {
4228         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4229         _exit(1);
4230 }
4231
4232 /*
4233   called regularly to verify that the recovery daemon is still running
4234  */
4235 static void ctdb_check_recd(struct tevent_context *ev,
4236                             struct tevent_timer *te,
4237                             struct timeval yt, void *p)
4238 {
4239         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4240
4241         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4242                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4243
4244                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
4245                                  ctdb_restart_recd, ctdb);
4246
4247                 return;
4248         }
4249
4250         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4251                          timeval_current_ofs(30, 0),
4252                          ctdb_check_recd, ctdb);
4253 }
4254
4255 static void recd_sig_child_handler(struct tevent_context *ev,
4256                                    struct tevent_signal *se, int signum,
4257                                    int count, void *dont_care,
4258                                    void *private_data)
4259 {
4260 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4261         int status;
4262         pid_t pid = -1;
4263
4264         while (pid != 0) {
4265                 pid = waitpid(-1, &status, WNOHANG);
4266                 if (pid == -1) {
4267                         if (errno != ECHILD) {
4268                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4269                         }
4270                         return;
4271                 }
4272                 if (pid > 0) {
4273                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4274                 }
4275         }
4276 }
4277
4278 /*
4279   startup the recovery daemon as a child of the main ctdb daemon
4280  */
4281 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4282 {
4283         int fd[2];
4284         struct tevent_signal *se;
4285         struct tevent_fd *fde;
4286
4287         if (pipe(fd) != 0) {
4288                 return -1;
4289         }
4290
4291         ctdb->recoverd_pid = ctdb_fork(ctdb);
4292         if (ctdb->recoverd_pid == -1) {
4293                 return -1;
4294         }
4295
4296         if (ctdb->recoverd_pid != 0) {
4297                 talloc_free(ctdb->recd_ctx);
4298                 ctdb->recd_ctx = talloc_new(ctdb);
4299                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4300
4301                 close(fd[0]);
4302                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4303                                  timeval_current_ofs(30, 0),
4304                                  ctdb_check_recd, ctdb);
4305                 return 0;
4306         }
4307
4308         close(fd[1]);
4309
4310         srandom(getpid() ^ time(NULL));
4311
4312         ctdb_set_process_name("ctdb_recovered");
4313         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4314                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4315                 exit(1);
4316         }
4317
4318         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4319
4320         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
4321                             ctdb_recoverd_parent, &fd[0]);
4322         tevent_fd_set_auto_close(fde);
4323
4324         /* set up a handler to pick up sigchld */
4325         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
4326                                recd_sig_child_handler, ctdb);
4327         if (se == NULL) {
4328                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4329                 exit(1);
4330         }
4331
4332         monitor_cluster(ctdb);
4333
4334         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4335         return -1;
4336 }
4337
4338 /*
4339   shutdown the recovery daemon
4340  */
4341 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4342 {
4343         if (ctdb->recoverd_pid == 0) {
4344                 return;
4345         }
4346
4347         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4348         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4349
4350         TALLOC_FREE(ctdb->recd_ctx);
4351         TALLOC_FREE(ctdb->recd_ping_count);
4352 }
4353
4354 static void ctdb_restart_recd(struct tevent_context *ev,
4355                               struct tevent_timer *te,
4356                               struct timeval t, void *private_data)
4357 {
4358         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4359
4360         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4361         ctdb_stop_recoverd(ctdb);
4362         ctdb_start_recoverd(ctdb);
4363 }