12ab5635dbeebd70156accb48fd12a3faa7a7053
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "ctdb_cluster_mutex.h"
46
47 /* List of SRVID requests that need to be processed */
48 struct srvid_list {
49         struct srvid_list *next, *prev;
50         struct ctdb_srvid_message *request;
51 };
52
53 struct srvid_requests {
54         struct srvid_list *requests;
55 };
56
57 static void srvid_request_reply(struct ctdb_context *ctdb,
58                                 struct ctdb_srvid_message *request,
59                                 TDB_DATA result)
60 {
61         /* Someone that sent srvid==0 does not want a reply */
62         if (request->srvid == 0) {
63                 talloc_free(request);
64                 return;
65         }
66
67         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
68                                      result) == 0) {
69                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
70                                   (unsigned)request->pnn,
71                                   (unsigned long long)request->srvid));
72         } else {
73                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
74                                  (unsigned)request->pnn,
75                                  (unsigned long long)request->srvid));
76         }
77
78         talloc_free(request);
79 }
80
81 static void srvid_requests_reply(struct ctdb_context *ctdb,
82                                  struct srvid_requests **requests,
83                                  TDB_DATA result)
84 {
85         struct srvid_list *r;
86
87         if (*requests == NULL) {
88                 return;
89         }
90
91         for (r = (*requests)->requests; r != NULL; r = r->next) {
92                 srvid_request_reply(ctdb, r->request, result);
93         }
94
95         /* Free the list structure... */
96         TALLOC_FREE(*requests);
97 }
98
99 static void srvid_request_add(struct ctdb_context *ctdb,
100                               struct srvid_requests **requests,
101                               struct ctdb_srvid_message *request)
102 {
103         struct srvid_list *t;
104         int32_t ret;
105         TDB_DATA result;
106
107         if (*requests == NULL) {
108                 *requests = talloc_zero(ctdb, struct srvid_requests);
109                 if (*requests == NULL) {
110                         goto nomem;
111                 }
112         }
113
114         t = talloc_zero(*requests, struct srvid_list);
115         if (t == NULL) {
116                 /* If *requests was just allocated above then free it */
117                 if ((*requests)->requests == NULL) {
118                         TALLOC_FREE(*requests);
119                 }
120                 goto nomem;
121         }
122
123         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
124         DLIST_ADD((*requests)->requests, t);
125
126         return;
127
128 nomem:
129         /* Failed to add the request to the list.  Send a fail. */
130         DEBUG(DEBUG_ERR, (__location__
131                           " Out of memory, failed to queue SRVID request\n"));
132         ret = -ENOMEM;
133         result.dsize = sizeof(ret);
134         result.dptr = (uint8_t *)&ret;
135         srvid_request_reply(ctdb, request, result);
136 }
137
138 /* An abstraction to allow an operation (takeover runs, recoveries,
139  * ...) to be disabled for a given timeout */
140 struct ctdb_op_state {
141         struct tevent_timer *timer;
142         bool in_progress;
143         const char *name;
144 };
145
146 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
147 {
148         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
149
150         if (state != NULL) {
151                 state->in_progress = false;
152                 state->name = name;
153         }
154
155         return state;
156 }
157
158 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
159 {
160         return state->timer != NULL;
161 }
162
163 static bool ctdb_op_begin(struct ctdb_op_state *state)
164 {
165         if (ctdb_op_is_disabled(state)) {
166                 DEBUG(DEBUG_NOTICE,
167                       ("Unable to begin - %s are disabled\n", state->name));
168                 return false;
169         }
170
171         state->in_progress = true;
172         return true;
173 }
174
175 static bool ctdb_op_end(struct ctdb_op_state *state)
176 {
177         return state->in_progress = false;
178 }
179
180 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
181 {
182         return state->in_progress;
183 }
184
185 static void ctdb_op_enable(struct ctdb_op_state *state)
186 {
187         TALLOC_FREE(state->timer);
188 }
189
190 static void ctdb_op_timeout_handler(struct tevent_context *ev,
191                                     struct tevent_timer *te,
192                                     struct timeval yt, void *p)
193 {
194         struct ctdb_op_state *state =
195                 talloc_get_type(p, struct ctdb_op_state);
196
197         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
198         ctdb_op_enable(state);
199 }
200
201 static int ctdb_op_disable(struct ctdb_op_state *state,
202                            struct tevent_context *ev,
203                            uint32_t timeout)
204 {
205         if (timeout == 0) {
206                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
207                 ctdb_op_enable(state);
208                 return 0;
209         }
210
211         if (state->in_progress) {
212                 DEBUG(DEBUG_ERR,
213                       ("Unable to disable %s - in progress\n", state->name));
214                 return -EAGAIN;
215         }
216
217         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
218                             state->name, timeout));
219
220         /* Clear any old timers */
221         talloc_free(state->timer);
222
223         /* Arrange for the timeout to occur */
224         state->timer = tevent_add_timer(ev, state,
225                                         timeval_current_ofs(timeout, 0),
226                                         ctdb_op_timeout_handler, state);
227         if (state->timer == NULL) {
228                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
229                 return -ENOMEM;
230         }
231
232         return 0;
233 }
234
235 struct ctdb_banning_state {
236         uint32_t count;
237         struct timeval last_reported_time;
238 };
239
240 /*
241   private state of recovery daemon
242  */
243 struct ctdb_recoverd {
244         struct ctdb_context *ctdb;
245         uint32_t recmaster;
246         uint32_t last_culprit_node;
247         struct ctdb_node_map_old *nodemap;
248         struct timeval priority_time;
249         bool need_takeover_run;
250         bool need_recovery;
251         uint32_t node_flags;
252         struct tevent_timer *send_election_te;
253         struct tevent_timer *election_timeout;
254         struct srvid_requests *reallocate_requests;
255         struct ctdb_op_state *takeover_run;
256         struct ctdb_op_state *recovery;
257         struct ctdb_iface_list_old *ifaces;
258         uint32_t *force_rebalance_nodes;
259         struct ctdb_node_capabilities *caps;
260         bool frozen_on_inactive;
261         struct ctdb_cluster_mutex_handle *recovery_lock_handle;
262 };
263
264 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
265 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
266
267 static void ctdb_restart_recd(struct tevent_context *ev,
268                               struct tevent_timer *te, struct timeval t,
269                               void *private_data);
270
271 /*
272   ban a node for a period of time
273  */
274 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
275 {
276         int ret;
277         struct ctdb_context *ctdb = rec->ctdb;
278         struct ctdb_ban_state bantime;
279
280         if (!ctdb_validate_pnn(ctdb, pnn)) {
281                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
282                 return;
283         }
284
285         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
286
287         bantime.pnn  = pnn;
288         bantime.time = ban_time;
289
290         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
291         if (ret != 0) {
292                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
293                 return;
294         }
295
296 }
297
298 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
299
300
301 /*
302   remember the trouble maker
303  */
304 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
305 {
306         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
307         struct ctdb_banning_state *ban_state;
308
309         if (culprit > ctdb->num_nodes) {
310                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
311                 return;
312         }
313
314         /* If we are banned or stopped, do not set other nodes as culprits */
315         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
316                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
317                 return;
318         }
319
320         if (ctdb->nodes[culprit]->ban_state == NULL) {
321                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
322                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
323
324                 
325         }
326         ban_state = ctdb->nodes[culprit]->ban_state;
327         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
328                 /* this was the first time in a long while this node
329                    misbehaved so we will forgive any old transgressions.
330                 */
331                 ban_state->count = 0;
332         }
333
334         ban_state->count += count;
335         ban_state->last_reported_time = timeval_current();
336         rec->last_culprit_node = culprit;
337 }
338
339 /*
340   remember the trouble maker
341  */
342 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
343 {
344         ctdb_set_culprit_count(rec, culprit, 1);
345 }
346
347 /*
348   Retrieve capabilities from all connected nodes
349  */
350 static int update_capabilities(struct ctdb_recoverd *rec,
351                                struct ctdb_node_map_old *nodemap)
352 {
353         uint32_t *capp;
354         TALLOC_CTX *tmp_ctx;
355         struct ctdb_node_capabilities *caps;
356         struct ctdb_context *ctdb = rec->ctdb;
357
358         tmp_ctx = talloc_new(rec);
359         CTDB_NO_MEMORY(ctdb, tmp_ctx);
360
361         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
362                                      CONTROL_TIMEOUT(), nodemap);
363
364         if (caps == NULL) {
365                 DEBUG(DEBUG_ERR,
366                       (__location__ " Failed to get node capabilities\n"));
367                 talloc_free(tmp_ctx);
368                 return -1;
369         }
370
371         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
372         if (capp == NULL) {
373                 DEBUG(DEBUG_ERR,
374                       (__location__
375                        " Capabilities don't include current node.\n"));
376                 talloc_free(tmp_ctx);
377                 return -1;
378         }
379         ctdb->capabilities = *capp;
380
381         TALLOC_FREE(rec->caps);
382         rec->caps = talloc_steal(rec, caps);
383
384         talloc_free(tmp_ctx);
385         return 0;
386 }
387
388 /*
389   change recovery mode on all nodes
390  */
391 static int set_recovery_mode(struct ctdb_context *ctdb,
392                              struct ctdb_recoverd *rec,
393                              struct ctdb_node_map_old *nodemap,
394                              uint32_t rec_mode)
395 {
396         TDB_DATA data;
397         uint32_t *nodes;
398         TALLOC_CTX *tmp_ctx;
399
400         tmp_ctx = talloc_new(ctdb);
401         CTDB_NO_MEMORY(ctdb, tmp_ctx);
402
403         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
404
405         data.dsize = sizeof(uint32_t);
406         data.dptr = (unsigned char *)&rec_mode;
407
408         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
409                                         nodes, 0,
410                                         CONTROL_TIMEOUT(),
411                                         false, data,
412                                         NULL, NULL,
413                                         NULL) != 0) {
414                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
415                 talloc_free(tmp_ctx);
416                 return -1;
417         }
418
419         talloc_free(tmp_ctx);
420         return 0;
421 }
422
423 /*
424   ensure all other nodes have attached to any databases that we have
425  */
426 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
427                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
428 {
429         int i, j, db, ret;
430         struct ctdb_dbid_map_old *remote_dbmap;
431
432         /* verify that all other nodes have all our databases */
433         for (j=0; j<nodemap->num; j++) {
434                 /* we don't need to ourself ourselves */
435                 if (nodemap->nodes[j].pnn == pnn) {
436                         continue;
437                 }
438                 /* don't check nodes that are unavailable */
439                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
440                         continue;
441                 }
442
443                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
444                                          mem_ctx, &remote_dbmap);
445                 if (ret != 0) {
446                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
447                         return -1;
448                 }
449
450                 /* step through all local databases */
451                 for (db=0; db<dbmap->num;db++) {
452                         const char *name;
453
454
455                         for (i=0;i<remote_dbmap->num;i++) {
456                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
457                                         break;
458                                 }
459                         }
460                         /* the remote node already have this database */
461                         if (i!=remote_dbmap->num) {
462                                 continue;
463                         }
464                         /* ok so we need to create this database */
465                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
466                                                   dbmap->dbs[db].db_id, mem_ctx,
467                                                   &name);
468                         if (ret != 0) {
469                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
470                                 return -1;
471                         }
472                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
473                                                  nodemap->nodes[j].pnn,
474                                                  mem_ctx, name,
475                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
476                         if (ret != 0) {
477                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
478                                 return -1;
479                         }
480                 }
481         }
482
483         return 0;
484 }
485
486
487 /*
488   ensure we are attached to any databases that anyone else is attached to
489  */
490 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
491                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
492 {
493         int i, j, db, ret;
494         struct ctdb_dbid_map_old *remote_dbmap;
495
496         /* verify that we have all database any other node has */
497         for (j=0; j<nodemap->num; j++) {
498                 /* we don't need to ourself ourselves */
499                 if (nodemap->nodes[j].pnn == pnn) {
500                         continue;
501                 }
502                 /* don't check nodes that are unavailable */
503                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
504                         continue;
505                 }
506
507                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                          mem_ctx, &remote_dbmap);
509                 if (ret != 0) {
510                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
511                         return -1;
512                 }
513
514                 /* step through all databases on the remote node */
515                 for (db=0; db<remote_dbmap->num;db++) {
516                         const char *name;
517
518                         for (i=0;i<(*dbmap)->num;i++) {
519                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
520                                         break;
521                                 }
522                         }
523                         /* we already have this db locally */
524                         if (i!=(*dbmap)->num) {
525                                 continue;
526                         }
527                         /* ok so we need to create this database and
528                            rebuild dbmap
529                          */
530                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
531                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
532                         if (ret != 0) {
533                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
534                                           nodemap->nodes[j].pnn));
535                                 return -1;
536                         }
537                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
538                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
539                         if (ret != 0) {
540                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
541                                 return -1;
542                         }
543                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
544                         if (ret != 0) {
545                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
546                                 return -1;
547                         }
548                 }
549         }
550
551         return 0;
552 }
553
554 /*
555   update flags on all active nodes
556  */
557 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
558 {
559         int ret;
560
561         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
562                 if (ret != 0) {
563                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
564                 return -1;
565         }
566
567         return 0;
568 }
569
570 /*
571   called when a vacuum fetch has completed - just free it and do the next one
572  */
573 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
574 {
575         talloc_free(state);
576 }
577
578
579 /**
580  * Process one elements of the vacuum fetch list:
581  * Migrate it over to us with the special flag
582  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
583  */
584 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
585                                      uint32_t pnn,
586                                      struct ctdb_rec_data_old *r)
587 {
588         struct ctdb_client_call_state *state;
589         TDB_DATA data;
590         struct ctdb_ltdb_header *hdr;
591         struct ctdb_call call;
592
593         ZERO_STRUCT(call);
594         call.call_id = CTDB_NULL_FUNC;
595         call.flags = CTDB_IMMEDIATE_MIGRATION;
596         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
597
598         call.key.dptr = &r->data[0];
599         call.key.dsize = r->keylen;
600
601         /* ensure we don't block this daemon - just skip a record if we can't get
602            the chainlock */
603         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
604                 return true;
605         }
606
607         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
608         if (data.dptr == NULL) {
609                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
610                 return true;
611         }
612
613         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
614                 free(data.dptr);
615                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
616                 return true;
617         }
618
619         hdr = (struct ctdb_ltdb_header *)data.dptr;
620         if (hdr->dmaster == pnn) {
621                 /* its already local */
622                 free(data.dptr);
623                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
624                 return true;
625         }
626
627         free(data.dptr);
628
629         state = ctdb_call_send(ctdb_db, &call);
630         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
631         if (state == NULL) {
632                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
633                 return false;
634         }
635         state->async.fn = vacuum_fetch_callback;
636         state->async.private_data = NULL;
637
638         return true;
639 }
640
641
642 /*
643   handler for vacuum fetch
644 */
645 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
646                                  void *private_data)
647 {
648         struct ctdb_recoverd *rec = talloc_get_type(
649                 private_data, struct ctdb_recoverd);
650         struct ctdb_context *ctdb = rec->ctdb;
651         struct ctdb_marshall_buffer *recs;
652         int ret, i;
653         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
654         const char *name;
655         struct ctdb_dbid_map_old *dbmap=NULL;
656         bool persistent = false;
657         struct ctdb_db_context *ctdb_db;
658         struct ctdb_rec_data_old *r;
659
660         recs = (struct ctdb_marshall_buffer *)data.dptr;
661
662         if (recs->count == 0) {
663                 goto done;
664         }
665
666         /* work out if the database is persistent */
667         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
668         if (ret != 0) {
669                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
670                 goto done;
671         }
672
673         for (i=0;i<dbmap->num;i++) {
674                 if (dbmap->dbs[i].db_id == recs->db_id) {
675                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
676                         break;
677                 }
678         }
679         if (i == dbmap->num) {
680                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
681                 goto done;
682         }
683
684         /* find the name of this database */
685         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
686                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
687                 goto done;
688         }
689
690         /* attach to it */
691         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
692         if (ctdb_db == NULL) {
693                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
694                 goto done;
695         }
696
697         r = (struct ctdb_rec_data_old *)&recs->data[0];
698         while (recs->count) {
699                 bool ok;
700
701                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
702                 if (!ok) {
703                         break;
704                 }
705
706                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
707                 recs->count--;
708         }
709
710 done:
711         talloc_free(tmp_ctx);
712 }
713
714
715 /*
716  * handler for database detach
717  */
718 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
719                                     void *private_data)
720 {
721         struct ctdb_recoverd *rec = talloc_get_type(
722                 private_data, struct ctdb_recoverd);
723         struct ctdb_context *ctdb = rec->ctdb;
724         uint32_t db_id;
725         struct ctdb_db_context *ctdb_db;
726
727         if (data.dsize != sizeof(db_id)) {
728                 return;
729         }
730         db_id = *(uint32_t *)data.dptr;
731
732         ctdb_db = find_ctdb_db(ctdb, db_id);
733         if (ctdb_db == NULL) {
734                 /* database is not attached */
735                 return;
736         }
737
738         DLIST_REMOVE(ctdb->db_list, ctdb_db);
739
740         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
741                              ctdb_db->db_name));
742         talloc_free(ctdb_db);
743 }
744
745 /*
746   called when ctdb_wait_timeout should finish
747  */
748 static void ctdb_wait_handler(struct tevent_context *ev,
749                               struct tevent_timer *te,
750                               struct timeval yt, void *p)
751 {
752         uint32_t *timed_out = (uint32_t *)p;
753         (*timed_out) = 1;
754 }
755
756 /*
757   wait for a given number of seconds
758  */
759 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
760 {
761         uint32_t timed_out = 0;
762         time_t usecs = (secs - (time_t)secs) * 1000000;
763         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
764                          ctdb_wait_handler, &timed_out);
765         while (!timed_out) {
766                 tevent_loop_once(ctdb->ev);
767         }
768 }
769
770 /*
771   called when an election times out (ends)
772  */
773 static void ctdb_election_timeout(struct tevent_context *ev,
774                                   struct tevent_timer *te,
775                                   struct timeval t, void *p)
776 {
777         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
778         rec->election_timeout = NULL;
779         fast_start = false;
780
781         DEBUG(DEBUG_WARNING,("Election period ended\n"));
782 }
783
784
785 /*
786   wait for an election to finish. It finished election_timeout seconds after
787   the last election packet is received
788  */
789 static void ctdb_wait_election(struct ctdb_recoverd *rec)
790 {
791         struct ctdb_context *ctdb = rec->ctdb;
792         while (rec->election_timeout) {
793                 tevent_loop_once(ctdb->ev);
794         }
795 }
796
797 /*
798   Update our local flags from all remote connected nodes. 
799   This is only run when we are or we belive we are the recovery master
800  */
801 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
802 {
803         int j;
804         struct ctdb_context *ctdb = rec->ctdb;
805         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
806
807         /* get the nodemap for all active remote nodes and verify
808            they are the same as for this node
809          */
810         for (j=0; j<nodemap->num; j++) {
811                 struct ctdb_node_map_old *remote_nodemap=NULL;
812                 int ret;
813
814                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
815                         continue;
816                 }
817                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
818                         continue;
819                 }
820
821                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
822                                            mem_ctx, &remote_nodemap);
823                 if (ret != 0) {
824                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
825                                   nodemap->nodes[j].pnn));
826                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
827                         talloc_free(mem_ctx);
828                         return -1;
829                 }
830                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
831                         /* We should tell our daemon about this so it
832                            updates its flags or else we will log the same 
833                            message again in the next iteration of recovery.
834                            Since we are the recovery master we can just as
835                            well update the flags on all nodes.
836                         */
837                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
838                         if (ret != 0) {
839                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
840                                 return -1;
841                         }
842
843                         /* Update our local copy of the flags in the recovery
844                            daemon.
845                         */
846                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
847                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
848                                  nodemap->nodes[j].flags));
849                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
850                 }
851                 talloc_free(remote_nodemap);
852         }
853         talloc_free(mem_ctx);
854         return 0;
855 }
856
857
858 /* Create a new random generation id.
859    The generation id can not be the INVALID_GENERATION id
860 */
861 static uint32_t new_generation(void)
862 {
863         uint32_t generation;
864
865         while (1) {
866                 generation = random();
867
868                 if (generation != INVALID_GENERATION) {
869                         break;
870                 }
871         }
872
873         return generation;
874 }
875
876 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
877 {
878         return (rec->recovery_lock_handle != NULL);
879 }
880
881 struct hold_reclock_state {
882         bool done;
883         bool locked;
884         double latency;
885 };
886
887 static void take_reclock_handler(char status,
888                                  double latency,
889                                  void *private_data)
890 {
891         struct hold_reclock_state *s =
892                 (struct hold_reclock_state *) private_data;
893
894         switch (status) {
895         case '0':
896                 s->latency = latency;
897                 break;
898
899         case '1':
900                 DEBUG(DEBUG_ERR,
901                       ("Unable to take recovery lock - contention\n"));
902                 break;
903
904         default:
905                 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
906         }
907
908         s->done = true;
909         s->locked = (status == '0') ;
910 }
911
912 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec);
913
914 static void lost_reclock_handler(void *private_data)
915 {
916         struct ctdb_recoverd *rec = talloc_get_type_abort(
917                 private_data, struct ctdb_recoverd);
918
919         DEBUG(DEBUG_ERR,
920               ("Recovery lock helper terminated unexpectedly - "
921                "trying to retake recovery lock\n"));
922         TALLOC_FREE(rec->recovery_lock_handle);
923         if (! ctdb_recovery_lock(rec)) {
924                 DEBUG(DEBUG_ERR, ("Failed to take recovery lock\n"));
925         }
926 }
927
928 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
929 {
930         struct ctdb_context *ctdb = rec->ctdb;
931         struct ctdb_cluster_mutex_handle *h;
932         struct hold_reclock_state s = {
933                 .done = false,
934                 .locked = false,
935                 .latency = 0,
936         };
937
938         h = ctdb_cluster_mutex(rec, ctdb, ctdb->recovery_lock, 0,
939                                take_reclock_handler, &s,
940                                lost_reclock_handler, rec);
941         if (h == NULL) {
942                 return false;
943         }
944
945         while (!s.done) {
946                 tevent_loop_once(ctdb->ev);
947         }
948
949         if (! s.locked) {
950                 talloc_free(h);
951                 return false;
952         }
953
954         rec->recovery_lock_handle = h;
955         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(),
956                                            s.latency);
957
958         return true;
959 }
960
961 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
962 {
963         if (rec->recovery_lock_handle != NULL) {
964                 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
965                 TALLOC_FREE(rec->recovery_lock_handle);
966         }
967 }
968
969 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
970 {
971         struct ctdb_context *ctdb = rec->ctdb;
972         int i;
973         struct ctdb_banning_state *ban_state;
974
975         *self_ban = false;
976         for (i=0; i<ctdb->num_nodes; i++) {
977                 if (ctdb->nodes[i]->ban_state == NULL) {
978                         continue;
979                 }
980                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
981                 if (ban_state->count < 2*ctdb->num_nodes) {
982                         continue;
983                 }
984
985                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
986                         ctdb->nodes[i]->pnn, ban_state->count,
987                         ctdb->tunable.recovery_ban_period));
988                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
989                 ban_state->count = 0;
990
991                 /* Banning ourself? */
992                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
993                         *self_ban = true;
994                 }
995         }
996 }
997
998 struct helper_state {
999         int fd[2];
1000         pid_t pid;
1001         int result;
1002         bool done;
1003 };
1004
1005 static void helper_handler(struct tevent_context *ev,
1006                            struct tevent_fd *fde,
1007                            uint16_t flags, void *private_data)
1008 {
1009         struct helper_state *state = talloc_get_type_abort(
1010                 private_data, struct helper_state);
1011         int ret;
1012
1013         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1014         if (ret != sizeof(state->result)) {
1015                 state->result = EPIPE;
1016         }
1017
1018         state->done = true;
1019 }
1020
1021 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1022                       const char *prog, const char *arg, const char *type)
1023 {
1024         struct helper_state *state;
1025         struct tevent_fd *fde;
1026         const char **args;
1027         int nargs, ret;
1028
1029         state = talloc_zero(mem_ctx, struct helper_state);
1030         if (state == NULL) {
1031                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1032                 return -1;
1033         }
1034
1035         state->pid = -1;
1036
1037         ret = pipe(state->fd);
1038         if (ret != 0) {
1039                 DEBUG(DEBUG_ERR,
1040                       ("Failed to create pipe for %s helper\n", type));
1041                 goto fail;
1042         }
1043
1044         set_close_on_exec(state->fd[0]);
1045
1046         nargs = 4;
1047         args = talloc_array(state, const char *, nargs);
1048         if (args == NULL) {
1049                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1050                 goto fail;
1051         }
1052
1053         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1054         if (args[0] == NULL) {
1055                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1056                 goto fail;
1057         }
1058         args[1] = rec->ctdb->daemon.name;
1059         args[2] = arg;
1060         args[3] = NULL;
1061
1062         if (args[2] == NULL) {
1063                 nargs = 3;
1064         }
1065
1066         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
1067         if (state->pid == -1) {
1068                 DEBUG(DEBUG_ERR,
1069                       ("Failed to create child for %s helper\n", type));
1070                 goto fail;
1071         }
1072
1073         close(state->fd[1]);
1074         state->fd[1] = -1;
1075
1076         state->done = false;
1077
1078         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1079                             TEVENT_FD_READ, helper_handler, state);
1080         if (fde == NULL) {
1081                 goto fail;
1082         }
1083         tevent_fd_set_auto_close(fde);
1084
1085         while (!state->done) {
1086                 tevent_loop_once(rec->ctdb->ev);
1087         }
1088
1089         close(state->fd[0]);
1090         state->fd[0] = -1;
1091
1092         if (state->result != 0) {
1093                 goto fail;
1094         }
1095
1096         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1097         talloc_free(state);
1098         return 0;
1099
1100 fail:
1101         if (state->fd[0] != -1) {
1102                 close(state->fd[0]);
1103         }
1104         if (state->fd[1] != -1) {
1105                 close(state->fd[1]);
1106         }
1107         if (state->pid != -1) {
1108                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1109         }
1110         talloc_free(state);
1111         return -1;
1112 }
1113
1114
1115
1116 static bool do_takeover_run(struct ctdb_recoverd *rec,
1117                             struct ctdb_node_map_old *nodemap)
1118 {
1119         uint32_t *nodes = NULL;
1120         struct ctdb_disable_message dtr;
1121         TDB_DATA data;
1122         int i;
1123         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1124         int ret;
1125         bool ok;
1126
1127         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1128
1129         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1130                 DEBUG(DEBUG_ERR, (__location__
1131                                   " takeover run already in progress \n"));
1132                 ok = false;
1133                 goto done;
1134         }
1135
1136         if (!ctdb_op_begin(rec->takeover_run)) {
1137                 ok = false;
1138                 goto done;
1139         }
1140
1141         /* Disable IP checks (takeover runs, really) on other nodes
1142          * while doing this takeover run.  This will stop those other
1143          * nodes from triggering takeover runs when think they should
1144          * be hosting an IP but it isn't yet on an interface.  Don't
1145          * wait for replies since a failure here might cause some
1146          * noise in the logs but will not actually cause a problem.
1147          */
1148         ZERO_STRUCT(dtr);
1149         dtr.srvid = 0; /* No reply */
1150         dtr.pnn = -1;
1151
1152         data.dptr  = (uint8_t*)&dtr;
1153         data.dsize = sizeof(dtr);
1154
1155         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1156
1157         /* Disable for 60 seconds.  This can be a tunable later if
1158          * necessary.
1159          */
1160         dtr.timeout = 60;
1161         for (i = 0; i < talloc_array_length(nodes); i++) {
1162                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1163                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1164                                              data) != 0) {
1165                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1166                 }
1167         }
1168
1169         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1170                                 rec->force_rebalance_nodes);
1171
1172         /* Reenable takeover runs and IP checks on other nodes */
1173         dtr.timeout = 0;
1174         for (i = 0; i < talloc_array_length(nodes); i++) {
1175                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1176                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1177                                              data) != 0) {
1178                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1179                 }
1180         }
1181
1182         if (ret != 0) {
1183                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1184                 ok = false;
1185                 goto done;
1186         }
1187
1188         ok = true;
1189         /* Takeover run was successful so clear force rebalance targets */
1190         if (rebalance_nodes == rec->force_rebalance_nodes) {
1191                 TALLOC_FREE(rec->force_rebalance_nodes);
1192         } else {
1193                 DEBUG(DEBUG_WARNING,
1194                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1195         }
1196 done:
1197         rec->need_takeover_run = !ok;
1198         talloc_free(nodes);
1199         ctdb_op_end(rec->takeover_run);
1200
1201         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1202         return ok;
1203 }
1204
1205 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1206 {
1207         static char prog[PATH_MAX+1] = "";
1208         const char *arg;
1209
1210         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1211                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1212                              "ctdb_recovery_helper")) {
1213                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1214         }
1215
1216         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1217         if (arg == NULL) {
1218                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1219                 return -1;
1220         }
1221
1222         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1223
1224         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1225 }
1226
1227 /*
1228   we are the recmaster, and recovery is needed - start a recovery run
1229  */
1230 static int do_recovery(struct ctdb_recoverd *rec,
1231                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1232                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1233 {
1234         struct ctdb_context *ctdb = rec->ctdb;
1235         int i, ret;
1236         struct ctdb_dbid_map_old *dbmap;
1237         bool self_ban;
1238
1239         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1240
1241         /* Check if the current node is still the recmaster.  It's possible that
1242          * re-election has changed the recmaster.
1243          */
1244         if (pnn != rec->recmaster) {
1245                 DEBUG(DEBUG_NOTICE,
1246                       ("Recovery master changed to %u, aborting recovery\n",
1247                        rec->recmaster));
1248                 return -1;
1249         }
1250
1251         /* if recovery fails, force it again */
1252         rec->need_recovery = true;
1253
1254         if (!ctdb_op_begin(rec->recovery)) {
1255                 return -1;
1256         }
1257
1258         if (rec->election_timeout) {
1259                 /* an election is in progress */
1260                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1261                 goto fail;
1262         }
1263
1264         ban_misbehaving_nodes(rec, &self_ban);
1265         if (self_ban) {
1266                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1267                 goto fail;
1268         }
1269
1270         if (ctdb->recovery_lock != NULL) {
1271                 if (ctdb_recovery_have_lock(rec)) {
1272                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
1273                 } else {
1274                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
1275                                              ctdb->recovery_lock));
1276                         if (!ctdb_recovery_lock(rec)) {
1277                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1278                                         /* If ctdb is trying first recovery, it's
1279                                          * possible that current node does not know
1280                                          * yet who the recmaster is.
1281                                          */
1282                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
1283                                                           " - retrying recovery\n"));
1284                                         goto fail;
1285                                 }
1286
1287                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1288                                                  "and ban ourself for %u seconds\n",
1289                                                  ctdb->tunable.recovery_ban_period));
1290                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1291                                 goto fail;
1292                         }
1293                         DEBUG(DEBUG_NOTICE,
1294                               ("Recovery lock taken successfully by recovery daemon\n"));
1295                 }
1296         }
1297
1298         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1299
1300         /* get a list of all databases */
1301         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1302         if (ret != 0) {
1303                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1304                 goto fail;
1305         }
1306
1307         /* we do the db creation before we set the recovery mode, so the freeze happens
1308            on all databases we will be dealing with. */
1309
1310         /* verify that we have all the databases any other node has */
1311         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1312         if (ret != 0) {
1313                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1314                 goto fail;
1315         }
1316
1317         /* verify that all other nodes have all our databases */
1318         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1319         if (ret != 0) {
1320                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1321                 goto fail;
1322         }
1323         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1324
1325
1326         /* Retrieve capabilities from all connected nodes */
1327         ret = update_capabilities(rec, nodemap);
1328         if (ret!=0) {
1329                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1330                 return -1;
1331         }
1332
1333         /*
1334           update all nodes to have the same flags that we have
1335          */
1336         for (i=0;i<nodemap->num;i++) {
1337                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1338                         continue;
1339                 }
1340
1341                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1342                 if (ret != 0) {
1343                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1344                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1345                         } else {
1346                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1347                                 return -1;
1348                         }
1349                 }
1350         }
1351
1352         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1353
1354         ret = db_recovery_parallel(rec, mem_ctx);
1355         if (ret != 0) {
1356                 goto fail;
1357         }
1358
1359         do_takeover_run(rec, nodemap);
1360
1361         /* send a message to all clients telling them that the cluster 
1362            has been reconfigured */
1363         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1364                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1365         if (ret != 0) {
1366                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1367                 goto fail;
1368         }
1369
1370         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1371
1372         rec->need_recovery = false;
1373         ctdb_op_end(rec->recovery);
1374
1375         /* we managed to complete a full recovery, make sure to forgive
1376            any past sins by the nodes that could now participate in the
1377            recovery.
1378         */
1379         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1380         for (i=0;i<nodemap->num;i++) {
1381                 struct ctdb_banning_state *ban_state;
1382
1383                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1384                         continue;
1385                 }
1386
1387                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1388                 if (ban_state == NULL) {
1389                         continue;
1390                 }
1391
1392                 ban_state->count = 0;
1393         }
1394
1395         /* We just finished a recovery successfully.
1396            We now wait for rerecovery_timeout before we allow
1397            another recovery to take place.
1398         */
1399         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1400         ctdb_op_disable(rec->recovery, ctdb->ev,
1401                         ctdb->tunable.rerecovery_timeout);
1402         return 0;
1403
1404 fail:
1405         ctdb_op_end(rec->recovery);
1406         return -1;
1407 }
1408
1409
1410 /*
1411   elections are won by first checking the number of connected nodes, then
1412   the priority time, then the pnn
1413  */
1414 struct election_message {
1415         uint32_t num_connected;
1416         struct timeval priority_time;
1417         uint32_t pnn;
1418         uint32_t node_flags;
1419 };
1420
1421 /*
1422   form this nodes election data
1423  */
1424 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1425 {
1426         int ret, i;
1427         struct ctdb_node_map_old *nodemap;
1428         struct ctdb_context *ctdb = rec->ctdb;
1429
1430         ZERO_STRUCTP(em);
1431
1432         em->pnn = rec->ctdb->pnn;
1433         em->priority_time = rec->priority_time;
1434
1435         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1436         if (ret != 0) {
1437                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1438                 return;
1439         }
1440
1441         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1442         em->node_flags = rec->node_flags;
1443
1444         for (i=0;i<nodemap->num;i++) {
1445                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1446                         em->num_connected++;
1447                 }
1448         }
1449
1450         /* we shouldnt try to win this election if we cant be a recmaster */
1451         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1452                 em->num_connected = 0;
1453                 em->priority_time = timeval_current();
1454         }
1455
1456         talloc_free(nodemap);
1457 }
1458
1459 /*
1460   see if the given election data wins
1461  */
1462 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1463 {
1464         struct election_message myem;
1465         int cmp = 0;
1466
1467         ctdb_election_data(rec, &myem);
1468
1469         /* we cant win if we don't have the recmaster capability */
1470         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1471                 return false;
1472         }
1473
1474         /* we cant win if we are banned */
1475         if (rec->node_flags & NODE_FLAGS_BANNED) {
1476                 return false;
1477         }
1478
1479         /* we cant win if we are stopped */
1480         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1481                 return false;
1482         }
1483
1484         /* we will automatically win if the other node is banned */
1485         if (em->node_flags & NODE_FLAGS_BANNED) {
1486                 return true;
1487         }
1488
1489         /* we will automatically win if the other node is banned */
1490         if (em->node_flags & NODE_FLAGS_STOPPED) {
1491                 return true;
1492         }
1493
1494         /* then the longest running node */
1495         if (cmp == 0) {
1496                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1497         }
1498
1499         if (cmp == 0) {
1500                 cmp = (int)myem.pnn - (int)em->pnn;
1501         }
1502
1503         return cmp > 0;
1504 }
1505
1506 /*
1507   send out an election request
1508  */
1509 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1510 {
1511         int ret;
1512         TDB_DATA election_data;
1513         struct election_message emsg;
1514         uint64_t srvid;
1515         struct ctdb_context *ctdb = rec->ctdb;
1516
1517         srvid = CTDB_SRVID_ELECTION;
1518
1519         ctdb_election_data(rec, &emsg);
1520
1521         election_data.dsize = sizeof(struct election_message);
1522         election_data.dptr  = (unsigned char *)&emsg;
1523
1524
1525         /* first we assume we will win the election and set 
1526            recoverymaster to be ourself on the current node
1527          */
1528         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1529                                      CTDB_CURRENT_NODE, pnn);
1530         if (ret != 0) {
1531                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1532                 return -1;
1533         }
1534         rec->recmaster = pnn;
1535
1536         /* send an election message to all active nodes */
1537         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1538         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1539 }
1540
1541 /*
1542   we think we are winning the election - send a broadcast election request
1543  */
1544 static void election_send_request(struct tevent_context *ev,
1545                                   struct tevent_timer *te,
1546                                   struct timeval t, void *p)
1547 {
1548         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1549         int ret;
1550
1551         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1552         if (ret != 0) {
1553                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1554         }
1555
1556         TALLOC_FREE(rec->send_election_te);
1557 }
1558
1559 /*
1560   handler for memory dumps
1561 */
1562 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1563 {
1564         struct ctdb_recoverd *rec = talloc_get_type(
1565                 private_data, struct ctdb_recoverd);
1566         struct ctdb_context *ctdb = rec->ctdb;
1567         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1568         TDB_DATA *dump;
1569         int ret;
1570         struct ctdb_srvid_message *rd;
1571
1572         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1573                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1574                 talloc_free(tmp_ctx);
1575                 return;
1576         }
1577         rd = (struct ctdb_srvid_message *)data.dptr;
1578
1579         dump = talloc_zero(tmp_ctx, TDB_DATA);
1580         if (dump == NULL) {
1581                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1582                 talloc_free(tmp_ctx);
1583                 return;
1584         }
1585         ret = ctdb_dump_memory(ctdb, dump);
1586         if (ret != 0) {
1587                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1588                 talloc_free(tmp_ctx);
1589                 return;
1590         }
1591
1592 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1593
1594         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1595         if (ret != 0) {
1596                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1597                 talloc_free(tmp_ctx);
1598                 return;
1599         }
1600
1601         talloc_free(tmp_ctx);
1602 }
1603
1604 /*
1605   handler for reload_nodes
1606 */
1607 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1608                                  void *private_data)
1609 {
1610         struct ctdb_recoverd *rec = talloc_get_type(
1611                 private_data, struct ctdb_recoverd);
1612
1613         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1614
1615         ctdb_load_nodes_file(rec->ctdb);
1616 }
1617
1618
1619 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1620                                         void *private_data)
1621 {
1622         struct ctdb_recoverd *rec = talloc_get_type(
1623                 private_data, struct ctdb_recoverd);
1624         struct ctdb_context *ctdb = rec->ctdb;
1625         uint32_t pnn;
1626         uint32_t *t;
1627         int len;
1628
1629         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1630                 return;
1631         }
1632
1633         if (data.dsize != sizeof(uint32_t)) {
1634                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1635                 return;
1636         }
1637
1638         pnn = *(uint32_t *)&data.dptr[0];
1639
1640         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1641
1642         /* Copy any existing list of nodes.  There's probably some
1643          * sort of realloc variant that will do this but we need to
1644          * make sure that freeing the old array also cancels the timer
1645          * event for the timeout... not sure if realloc will do that.
1646          */
1647         len = (rec->force_rebalance_nodes != NULL) ?
1648                 talloc_array_length(rec->force_rebalance_nodes) :
1649                 0;
1650
1651         /* This allows duplicates to be added but they don't cause
1652          * harm.  A call to add a duplicate PNN arguably means that
1653          * the timeout should be reset, so this is the simplest
1654          * solution.
1655          */
1656         t = talloc_zero_array(rec, uint32_t, len+1);
1657         CTDB_NO_MEMORY_VOID(ctdb, t);
1658         if (len > 0) {
1659                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1660         }
1661         t[len] = pnn;
1662
1663         talloc_free(rec->force_rebalance_nodes);
1664
1665         rec->force_rebalance_nodes = t;
1666 }
1667
1668
1669
1670 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1671                                     TDB_DATA data,
1672                                     struct ctdb_op_state *op_state)
1673 {
1674         struct ctdb_disable_message *r;
1675         uint32_t timeout;
1676         TDB_DATA result;
1677         int32_t ret = 0;
1678
1679         /* Validate input data */
1680         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1681                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1682                                  "expecting %lu\n", (long unsigned)data.dsize,
1683                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1684                 return;
1685         }
1686         if (data.dptr == NULL) {
1687                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1688                 return;
1689         }
1690
1691         r = (struct ctdb_disable_message *)data.dptr;
1692         timeout = r->timeout;
1693
1694         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1695         if (ret != 0) {
1696                 goto done;
1697         }
1698
1699         /* Returning our PNN tells the caller that we succeeded */
1700         ret = ctdb_get_pnn(ctdb);
1701 done:
1702         result.dsize = sizeof(int32_t);
1703         result.dptr  = (uint8_t *)&ret;
1704         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1705 }
1706
1707 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1708                                           void *private_data)
1709 {
1710         struct ctdb_recoverd *rec = talloc_get_type(
1711                 private_data, struct ctdb_recoverd);
1712
1713         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1714 }
1715
1716 /* Backward compatibility for this SRVID */
1717 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1718                                      void *private_data)
1719 {
1720         struct ctdb_recoverd *rec = talloc_get_type(
1721                 private_data, struct ctdb_recoverd);
1722         uint32_t timeout;
1723
1724         if (data.dsize != sizeof(uint32_t)) {
1725                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1726                                  "expecting %lu\n", (long unsigned)data.dsize,
1727                                  (long unsigned)sizeof(uint32_t)));
1728                 return;
1729         }
1730         if (data.dptr == NULL) {
1731                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1732                 return;
1733         }
1734
1735         timeout = *((uint32_t *)data.dptr);
1736
1737         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1738 }
1739
1740 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1741                                        void *private_data)
1742 {
1743         struct ctdb_recoverd *rec = talloc_get_type(
1744                 private_data, struct ctdb_recoverd);
1745
1746         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1747 }
1748
1749 /*
1750   handler for ip reallocate, just add it to the list of requests and 
1751   handle this later in the monitor_cluster loop so we do not recurse
1752   with other requests to takeover_run()
1753 */
1754 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1755                                   void *private_data)
1756 {
1757         struct ctdb_srvid_message *request;
1758         struct ctdb_recoverd *rec = talloc_get_type(
1759                 private_data, struct ctdb_recoverd);
1760
1761         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1762                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1763                 return;
1764         }
1765
1766         request = (struct ctdb_srvid_message *)data.dptr;
1767
1768         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1769 }
1770
1771 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1772                                           struct ctdb_recoverd *rec)
1773 {
1774         TDB_DATA result;
1775         int32_t ret;
1776         struct srvid_requests *current;
1777
1778         /* Only process requests that are currently pending.  More
1779          * might come in while the takeover run is in progress and
1780          * they will need to be processed later since they might
1781          * be in response flag changes.
1782          */
1783         current = rec->reallocate_requests;
1784         rec->reallocate_requests = NULL;
1785
1786         if (do_takeover_run(rec, rec->nodemap)) {
1787                 ret = ctdb_get_pnn(ctdb);
1788         } else {
1789                 ret = -1;
1790         }
1791
1792         result.dsize = sizeof(int32_t);
1793         result.dptr  = (uint8_t *)&ret;
1794
1795         srvid_requests_reply(ctdb, &current, result);
1796 }
1797
1798 /*
1799  * handler for assigning banning credits
1800  */
1801 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1802 {
1803         struct ctdb_recoverd *rec = talloc_get_type(
1804                 private_data, struct ctdb_recoverd);
1805         uint32_t ban_pnn;
1806
1807         /* Ignore if we are not recmaster */
1808         if (rec->ctdb->pnn != rec->recmaster) {
1809                 return;
1810         }
1811
1812         if (data.dsize != sizeof(uint32_t)) {
1813                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1814                                   data.dsize));
1815                 return;
1816         }
1817
1818         ban_pnn = *(uint32_t *)data.dptr;
1819
1820         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1821 }
1822
1823 /*
1824   handler for recovery master elections
1825 */
1826 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1827 {
1828         struct ctdb_recoverd *rec = talloc_get_type(
1829                 private_data, struct ctdb_recoverd);
1830         struct ctdb_context *ctdb = rec->ctdb;
1831         int ret;
1832         struct election_message *em = (struct election_message *)data.dptr;
1833
1834         /* Ignore election packets from ourself */
1835         if (ctdb->pnn == em->pnn) {
1836                 return;
1837         }
1838
1839         /* we got an election packet - update the timeout for the election */
1840         talloc_free(rec->election_timeout);
1841         rec->election_timeout = tevent_add_timer(
1842                         ctdb->ev, ctdb,
1843                         fast_start ?
1844                                 timeval_current_ofs(0, 500000) :
1845                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1846                         ctdb_election_timeout, rec);
1847
1848         /* someone called an election. check their election data
1849            and if we disagree and we would rather be the elected node, 
1850            send a new election message to all other nodes
1851          */
1852         if (ctdb_election_win(rec, em)) {
1853                 if (!rec->send_election_te) {
1854                         rec->send_election_te = tevent_add_timer(
1855                                         ctdb->ev, rec,
1856                                         timeval_current_ofs(0, 500000),
1857                                         election_send_request, rec);
1858                 }
1859                 return;
1860         }
1861
1862         /* we didn't win */
1863         TALLOC_FREE(rec->send_election_te);
1864
1865         /* Release the recovery lock file */
1866         if (ctdb_recovery_have_lock(rec)) {
1867                 ctdb_recovery_unlock(rec);
1868         }
1869
1870         /* ok, let that guy become recmaster then */
1871         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1872                                      CTDB_CURRENT_NODE, em->pnn);
1873         if (ret != 0) {
1874                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1875                 return;
1876         }
1877         rec->recmaster = em->pnn;
1878
1879         return;
1880 }
1881
1882
1883 /*
1884   force the start of the election process
1885  */
1886 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1887                            struct ctdb_node_map_old *nodemap)
1888 {
1889         int ret;
1890         struct ctdb_context *ctdb = rec->ctdb;
1891
1892         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1893
1894         /* set all nodes to recovery mode to stop all internode traffic */
1895         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1896         if (ret != 0) {
1897                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1898                 return;
1899         }
1900
1901         talloc_free(rec->election_timeout);
1902         rec->election_timeout = tevent_add_timer(
1903                         ctdb->ev, ctdb,
1904                         fast_start ?
1905                                 timeval_current_ofs(0, 500000) :
1906                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1907                         ctdb_election_timeout, rec);
1908
1909         ret = send_election_request(rec, pnn);
1910         if (ret!=0) {
1911                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1912                 return;
1913         }
1914
1915         /* wait for a few seconds to collect all responses */
1916         ctdb_wait_election(rec);
1917 }
1918
1919
1920
1921 /*
1922   handler for when a node changes its flags
1923 */
1924 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1925 {
1926         struct ctdb_recoverd *rec = talloc_get_type(
1927                 private_data, struct ctdb_recoverd);
1928         struct ctdb_context *ctdb = rec->ctdb;
1929         int ret;
1930         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1931         struct ctdb_node_map_old *nodemap=NULL;
1932         TALLOC_CTX *tmp_ctx;
1933         int i;
1934
1935         if (data.dsize != sizeof(*c)) {
1936                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1937                 return;
1938         }
1939
1940         tmp_ctx = talloc_new(ctdb);
1941         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1942
1943         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1944         if (ret != 0) {
1945                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1946                 talloc_free(tmp_ctx);
1947                 return;         
1948         }
1949
1950
1951         for (i=0;i<nodemap->num;i++) {
1952                 if (nodemap->nodes[i].pnn == c->pnn) break;
1953         }
1954
1955         if (i == nodemap->num) {
1956                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1957                 talloc_free(tmp_ctx);
1958                 return;
1959         }
1960
1961         if (c->old_flags != c->new_flags) {
1962                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1963         }
1964
1965         nodemap->nodes[i].flags = c->new_flags;
1966
1967         talloc_free(tmp_ctx);
1968 }
1969
1970 /*
1971   handler for when we need to push out flag changes ot all other nodes
1972 */
1973 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
1974                                void *private_data)
1975 {
1976         struct ctdb_recoverd *rec = talloc_get_type(
1977                 private_data, struct ctdb_recoverd);
1978         struct ctdb_context *ctdb = rec->ctdb;
1979         int ret;
1980         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1981         struct ctdb_node_map_old *nodemap=NULL;
1982         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1983         uint32_t *nodes;
1984
1985         /* read the node flags from the recmaster */
1986         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
1987                                    tmp_ctx, &nodemap);
1988         if (ret != 0) {
1989                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
1990                 talloc_free(tmp_ctx);
1991                 return;
1992         }
1993         if (c->pnn >= nodemap->num) {
1994                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
1995                 talloc_free(tmp_ctx);
1996                 return;
1997         }
1998
1999         /* send the flags update to all connected nodes */
2000         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2001
2002         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2003                                       nodes, 0, CONTROL_TIMEOUT(),
2004                                       false, data,
2005                                       NULL, NULL,
2006                                       NULL) != 0) {
2007                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2008
2009                 talloc_free(tmp_ctx);
2010                 return;
2011         }
2012
2013         talloc_free(tmp_ctx);
2014 }
2015
2016
2017 struct verify_recmode_normal_data {
2018         uint32_t count;
2019         enum monitor_result status;
2020 };
2021
2022 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2023 {
2024         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2025
2026
2027         /* one more node has responded with recmode data*/
2028         rmdata->count--;
2029
2030         /* if we failed to get the recmode, then return an error and let
2031            the main loop try again.
2032         */
2033         if (state->state != CTDB_CONTROL_DONE) {
2034                 if (rmdata->status == MONITOR_OK) {
2035                         rmdata->status = MONITOR_FAILED;
2036                 }
2037                 return;
2038         }
2039
2040         /* if we got a response, then the recmode will be stored in the
2041            status field
2042         */
2043         if (state->status != CTDB_RECOVERY_NORMAL) {
2044                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2045                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2046         }
2047
2048         return;
2049 }
2050
2051
2052 /* verify that all nodes are in normal recovery mode */
2053 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2054 {
2055         struct verify_recmode_normal_data *rmdata;
2056         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2057         struct ctdb_client_control_state *state;
2058         enum monitor_result status;
2059         int j;
2060         
2061         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2062         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2063         rmdata->count  = 0;
2064         rmdata->status = MONITOR_OK;
2065
2066         /* loop over all active nodes and send an async getrecmode call to 
2067            them*/
2068         for (j=0; j<nodemap->num; j++) {
2069                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2070                         continue;
2071                 }
2072                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2073                                         CONTROL_TIMEOUT(), 
2074                                         nodemap->nodes[j].pnn);
2075                 if (state == NULL) {
2076                         /* we failed to send the control, treat this as 
2077                            an error and try again next iteration
2078                         */                      
2079                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2080                         talloc_free(mem_ctx);
2081                         return MONITOR_FAILED;
2082                 }
2083
2084                 /* set up the callback functions */
2085                 state->async.fn = verify_recmode_normal_callback;
2086                 state->async.private_data = rmdata;
2087
2088                 /* one more control to wait for to complete */
2089                 rmdata->count++;
2090         }
2091
2092
2093         /* now wait for up to the maximum number of seconds allowed
2094            or until all nodes we expect a response from has replied
2095         */
2096         while (rmdata->count > 0) {
2097                 tevent_loop_once(ctdb->ev);
2098         }
2099
2100         status = rmdata->status;
2101         talloc_free(mem_ctx);
2102         return status;
2103 }
2104
2105
2106 struct verify_recmaster_data {
2107         struct ctdb_recoverd *rec;
2108         uint32_t count;
2109         uint32_t pnn;
2110         enum monitor_result status;
2111 };
2112
2113 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2114 {
2115         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2116
2117
2118         /* one more node has responded with recmaster data*/
2119         rmdata->count--;
2120
2121         /* if we failed to get the recmaster, then return an error and let
2122            the main loop try again.
2123         */
2124         if (state->state != CTDB_CONTROL_DONE) {
2125                 if (rmdata->status == MONITOR_OK) {
2126                         rmdata->status = MONITOR_FAILED;
2127                 }
2128                 return;
2129         }
2130
2131         /* if we got a response, then the recmaster will be stored in the
2132            status field
2133         */
2134         if (state->status != rmdata->pnn) {
2135                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2136                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2137                 rmdata->status = MONITOR_ELECTION_NEEDED;
2138         }
2139
2140         return;
2141 }
2142
2143
2144 /* verify that all nodes agree that we are the recmaster */
2145 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
2146 {
2147         struct ctdb_context *ctdb = rec->ctdb;
2148         struct verify_recmaster_data *rmdata;
2149         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2150         struct ctdb_client_control_state *state;
2151         enum monitor_result status;
2152         int j;
2153         
2154         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2155         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2156         rmdata->rec    = rec;
2157         rmdata->count  = 0;
2158         rmdata->pnn    = pnn;
2159         rmdata->status = MONITOR_OK;
2160
2161         /* loop over all active nodes and send an async getrecmaster call to
2162            them*/
2163         for (j=0; j<nodemap->num; j++) {
2164                 if (nodemap->nodes[j].pnn == rec->recmaster) {
2165                         continue;
2166                 }
2167                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2168                         continue;
2169                 }
2170                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2171                                         CONTROL_TIMEOUT(),
2172                                         nodemap->nodes[j].pnn);
2173                 if (state == NULL) {
2174                         /* we failed to send the control, treat this as 
2175                            an error and try again next iteration
2176                         */                      
2177                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2178                         talloc_free(mem_ctx);
2179                         return MONITOR_FAILED;
2180                 }
2181
2182                 /* set up the callback functions */
2183                 state->async.fn = verify_recmaster_callback;
2184                 state->async.private_data = rmdata;
2185
2186                 /* one more control to wait for to complete */
2187                 rmdata->count++;
2188         }
2189
2190
2191         /* now wait for up to the maximum number of seconds allowed
2192            or until all nodes we expect a response from has replied
2193         */
2194         while (rmdata->count > 0) {
2195                 tevent_loop_once(ctdb->ev);
2196         }
2197
2198         status = rmdata->status;
2199         talloc_free(mem_ctx);
2200         return status;
2201 }
2202
2203 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2204                                     struct ctdb_recoverd *rec)
2205 {
2206         struct ctdb_iface_list_old *ifaces = NULL;
2207         TALLOC_CTX *mem_ctx;
2208         bool ret = false;
2209
2210         mem_ctx = talloc_new(NULL);
2211
2212         /* Read the interfaces from the local node */
2213         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2214                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2215                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2216                 /* We could return an error.  However, this will be
2217                  * rare so we'll decide that the interfaces have
2218                  * actually changed, just in case.
2219                  */
2220                 talloc_free(mem_ctx);
2221                 return true;
2222         }
2223
2224         if (!rec->ifaces) {
2225                 /* We haven't been here before so things have changed */
2226                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2227                 ret = true;
2228         } else if (rec->ifaces->num != ifaces->num) {
2229                 /* Number of interfaces has changed */
2230                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2231                                      rec->ifaces->num, ifaces->num));
2232                 ret = true;
2233         } else {
2234                 /* See if interface names or link states have changed */
2235                 int i;
2236                 for (i = 0; i < rec->ifaces->num; i++) {
2237                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2238                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2239                                 DEBUG(DEBUG_NOTICE,
2240                                       ("Interface in slot %d changed: %s => %s\n",
2241                                        i, iface->name, ifaces->ifaces[i].name));
2242                                 ret = true;
2243                                 break;
2244                         }
2245                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2246                                 DEBUG(DEBUG_NOTICE,
2247                                       ("Interface %s changed state: %d => %d\n",
2248                                        iface->name, iface->link_state,
2249                                        ifaces->ifaces[i].link_state));
2250                                 ret = true;
2251                                 break;
2252                         }
2253                 }
2254         }
2255
2256         talloc_free(rec->ifaces);
2257         rec->ifaces = talloc_steal(rec, ifaces);
2258
2259         talloc_free(mem_ctx);
2260         return ret;
2261 }
2262
2263 /* Check that the local allocation of public IP addresses is correct
2264  * and do some house-keeping */
2265 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2266                                       struct ctdb_recoverd *rec,
2267                                       uint32_t pnn,
2268                                       struct ctdb_node_map_old *nodemap)
2269 {
2270         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2271         int ret, j;
2272         bool need_takeover_run = false;
2273         struct ctdb_public_ip_list_old *ips = NULL;
2274
2275         /* If we are not the recmaster then do some housekeeping */
2276         if (rec->recmaster != pnn) {
2277                 /* Ignore any IP reallocate requests - only recmaster
2278                  * processes them
2279                  */
2280                 TALLOC_FREE(rec->reallocate_requests);
2281                 /* Clear any nodes that should be force rebalanced in
2282                  * the next takeover run.  If the recovery master role
2283                  * has moved then we don't want to process these some
2284                  * time in the future.
2285                  */
2286                 TALLOC_FREE(rec->force_rebalance_nodes);
2287         }
2288
2289         /* Return early if disabled... */
2290         if (ctdb->tunable.disable_ip_failover != 0 ||
2291             ctdb_op_is_disabled(rec->takeover_run)) {
2292                 return  0;
2293         }
2294
2295         if (interfaces_have_changed(ctdb, rec)) {
2296                 need_takeover_run = true;
2297         }
2298
2299         /* If there are unhosted IPs but this node can host them then
2300          * trigger an IP reallocation */
2301
2302         /* Read *available* IPs from local node */
2303         ret = ctdb_ctrl_get_public_ips_flags(
2304                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2305                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2306         if (ret != 0) {
2307                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2308                 talloc_free(mem_ctx);
2309                 return -1;
2310         }
2311
2312         for (j=0; j<ips->num; j++) {
2313                 if (ips->ips[j].pnn == -1 &&
2314                     nodemap->nodes[pnn].flags == 0) {
2315                         DEBUG(DEBUG_WARNING,
2316                               ("Unassigned IP %s can be served by this node\n",
2317                                ctdb_addr_to_str(&ips->ips[j].addr)));
2318                         need_takeover_run = true;
2319                 }
2320         }
2321
2322         talloc_free(ips);
2323
2324         if (!ctdb->do_checkpublicip) {
2325                 goto done;
2326         }
2327
2328         /* Validate the IP addresses that this node has on network
2329          * interfaces.  If there is an inconsistency between reality
2330          * and the state expected by CTDB then try to fix it by
2331          * triggering an IP reallocation or releasing extraneous IP
2332          * addresses. */
2333
2334         /* Read *known* IPs from local node */
2335         ret = ctdb_ctrl_get_public_ips_flags(
2336                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2337         if (ret != 0) {
2338                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2339                 talloc_free(mem_ctx);
2340                 return -1;
2341         }
2342
2343         for (j=0; j<ips->num; j++) {
2344                 if (ips->ips[j].pnn == pnn) {
2345                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2346                                 DEBUG(DEBUG_ERR,
2347                                       ("Assigned IP %s not on an interface\n",
2348                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2349                                 need_takeover_run = true;
2350                         }
2351                 } else {
2352                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2353                                 DEBUG(DEBUG_ERR,
2354                                       ("IP %s incorrectly on an interface\n",
2355                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2356                                 need_takeover_run = true;
2357                         }
2358                 }
2359         }
2360
2361 done:
2362         if (need_takeover_run) {
2363                 struct ctdb_srvid_message rd;
2364                 TDB_DATA data;
2365
2366                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2367
2368                 ZERO_STRUCT(rd);
2369                 rd.pnn = ctdb->pnn;
2370                 rd.srvid = 0;
2371                 data.dptr = (uint8_t *)&rd;
2372                 data.dsize = sizeof(rd);
2373
2374                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2375                 if (ret != 0) {
2376                         DEBUG(DEBUG_ERR,
2377                               ("Failed to send takeover run request\n"));
2378                 }
2379         }
2380         talloc_free(mem_ctx);
2381         return 0;
2382 }
2383
2384
2385 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2386 {
2387         struct ctdb_node_map_old **remote_nodemaps = callback_data;
2388
2389         if (node_pnn >= ctdb->num_nodes) {
2390                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2391                 return;
2392         }
2393
2394         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
2395
2396 }
2397
2398 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2399         struct ctdb_node_map_old *nodemap,
2400         struct ctdb_node_map_old **remote_nodemaps)
2401 {
2402         uint32_t *nodes;
2403
2404         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2406                                         nodes, 0,
2407                                         CONTROL_TIMEOUT(), false, tdb_null,
2408                                         async_getnodemap_callback,
2409                                         NULL,
2410                                         remote_nodemaps) != 0) {
2411                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2412
2413                 return -1;
2414         }
2415
2416         return 0;
2417 }
2418
2419 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2420                                      TALLOC_CTX *mem_ctx)
2421 {
2422         struct ctdb_context *ctdb = rec->ctdb;
2423         uint32_t pnn = ctdb_get_pnn(ctdb);
2424         struct ctdb_node_map_old *nodemap = rec->nodemap;
2425         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2426         int ret;
2427
2428         /* When recovery daemon is started, recmaster is set to
2429          * "unknown" so it knows to start an election.
2430          */
2431         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2432                 DEBUG(DEBUG_NOTICE,
2433                       ("Initial recovery master set - forcing election\n"));
2434                 force_election(rec, pnn, nodemap);
2435                 return false;
2436         }
2437
2438         /*
2439          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2440          * but we have, then force an election and try to become the new
2441          * recmaster.
2442          */
2443         if (!ctdb_node_has_capabilities(rec->caps,
2444                                         rec->recmaster,
2445                                         CTDB_CAP_RECMASTER) &&
2446             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2447             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2448                 DEBUG(DEBUG_ERR,
2449                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2450                        " but we (node %u) have - force an election\n",
2451                        rec->recmaster, pnn));
2452                 force_election(rec, pnn, nodemap);
2453                 return false;
2454         }
2455
2456         /* Verify that the master node has not been deleted.  This
2457          * should not happen because a node should always be shutdown
2458          * before being deleted, causing a new master to be elected
2459          * before now.  However, if something strange has happened
2460          * then checking here will ensure we don't index beyond the
2461          * end of the nodemap array. */
2462         if (rec->recmaster >= nodemap->num) {
2463                 DEBUG(DEBUG_ERR,
2464                       ("Recmaster node %u has been deleted. Force election\n",
2465                        rec->recmaster));
2466                 force_election(rec, pnn, nodemap);
2467                 return false;
2468         }
2469
2470         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2471         if (nodemap->nodes[rec->recmaster].flags &
2472             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2473                 DEBUG(DEBUG_NOTICE,
2474                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2475                        rec->recmaster));
2476                 force_election(rec, pnn, nodemap);
2477                 return false;
2478         }
2479
2480         /* get nodemap from the recovery master to check if it is inactive */
2481         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2482                                    mem_ctx, &recmaster_nodemap);
2483         if (ret != 0) {
2484                 DEBUG(DEBUG_ERR,
2485                       (__location__
2486                        " Unable to get nodemap from recovery master %u\n",
2487                           rec->recmaster));
2488                 /* No election, just error */
2489                 return false;
2490         }
2491
2492
2493         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2494             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2495                 DEBUG(DEBUG_NOTICE,
2496                       ("Recmaster node %u is inactive. Force election\n",
2497                        rec->recmaster));
2498                 /*
2499                  * update our nodemap to carry the recmaster's notion of
2500                  * its own flags, so that we don't keep freezing the
2501                  * inactive recmaster node...
2502                  */
2503                 nodemap->nodes[rec->recmaster].flags =
2504                         recmaster_nodemap->nodes[rec->recmaster].flags;
2505                 force_election(rec, pnn, nodemap);
2506                 return false;
2507         }
2508
2509         return true;
2510 }
2511
2512 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2513                       TALLOC_CTX *mem_ctx)
2514 {
2515         uint32_t pnn;
2516         struct ctdb_node_map_old *nodemap=NULL;
2517         struct ctdb_node_map_old **remote_nodemaps=NULL;
2518         struct ctdb_vnn_map *vnnmap=NULL;
2519         struct ctdb_vnn_map *remote_vnnmap=NULL;
2520         uint32_t num_lmasters;
2521         int32_t debug_level;
2522         int i, j, ret;
2523         bool self_ban;
2524
2525
2526         /* verify that the main daemon is still running */
2527         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2528                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2529                 exit(-1);
2530         }
2531
2532         /* ping the local daemon to tell it we are alive */
2533         ctdb_ctrl_recd_ping(ctdb);
2534
2535         if (rec->election_timeout) {
2536                 /* an election is in progress */
2537                 return;
2538         }
2539
2540         /* read the debug level from the parent and update locally */
2541         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2542         if (ret !=0) {
2543                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2544                 return;
2545         }
2546         DEBUGLEVEL = debug_level;
2547
2548         /* get relevant tunables */
2549         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2550         if (ret != 0) {
2551                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2552                 return;
2553         }
2554
2555         /* get runstate */
2556         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2557                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2558         if (ret != 0) {
2559                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2560                 return;
2561         }
2562
2563         pnn = ctdb_get_pnn(ctdb);
2564
2565         /* get nodemap */
2566         TALLOC_FREE(rec->nodemap);
2567         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2568         if (ret != 0) {
2569                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2570                 return;
2571         }
2572         nodemap = rec->nodemap;
2573
2574         /* remember our own node flags */
2575         rec->node_flags = nodemap->nodes[pnn].flags;
2576
2577         ban_misbehaving_nodes(rec, &self_ban);
2578         if (self_ban) {
2579                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2580                 return;
2581         }
2582
2583         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2584            also frozen and that the recmode is set to active.
2585         */
2586         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2587                 /* If this node has become inactive then we want to
2588                  * reduce the chances of it taking over the recovery
2589                  * master role when it becomes active again.  This
2590                  * helps to stabilise the recovery master role so that
2591                  * it stays on the most stable node.
2592                  */
2593                 rec->priority_time = timeval_current();
2594
2595                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2596                 if (ret != 0) {
2597                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2598                 }
2599                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2600                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2601
2602                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2603                         if (ret != 0) {
2604                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2605
2606                                 return;
2607                         }
2608                 }
2609                 if (! rec->frozen_on_inactive) {
2610                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2611                                                CTDB_CURRENT_NODE);
2612                         if (ret != 0) {
2613                                 DEBUG(DEBUG_ERR,
2614                                       (__location__ " Failed to freeze node "
2615                                        "in STOPPED or BANNED state\n"));
2616                                 return;
2617                         }
2618
2619                         rec->frozen_on_inactive = true;
2620                 }
2621
2622                 /* If this node is stopped or banned then it is not the recovery
2623                  * master, so don't do anything. This prevents stopped or banned
2624                  * node from starting election and sending unnecessary controls.
2625                  */
2626                 return;
2627         }
2628
2629         rec->frozen_on_inactive = false;
2630
2631         /* Retrieve capabilities from all connected nodes */
2632         ret = update_capabilities(rec, nodemap);
2633         if (ret != 0) {
2634                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2635                 return;
2636         }
2637
2638         if (! validate_recovery_master(rec, mem_ctx)) {
2639                 return;
2640         }
2641
2642         /* Check if an IP takeover run is needed and trigger one if
2643          * necessary */
2644         verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2645
2646         /* if we are not the recmaster then we do not need to check
2647            if recovery is needed
2648          */
2649         if (pnn != rec->recmaster) {
2650                 return;
2651         }
2652
2653
2654         /* ensure our local copies of flags are right */
2655         ret = update_local_flags(rec, nodemap);
2656         if (ret != 0) {
2657                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2658                 return;
2659         }
2660
2661         if (ctdb->num_nodes != nodemap->num) {
2662                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2663                 ctdb_load_nodes_file(ctdb);
2664                 return;
2665         }
2666
2667         /* verify that all active nodes agree that we are the recmaster */
2668         switch (verify_recmaster(rec, nodemap, pnn)) {
2669         case MONITOR_RECOVERY_NEEDED:
2670                 /* can not happen */
2671                 return;
2672         case MONITOR_ELECTION_NEEDED:
2673                 force_election(rec, pnn, nodemap);
2674                 return;
2675         case MONITOR_OK:
2676                 break;
2677         case MONITOR_FAILED:
2678                 return;
2679         }
2680
2681
2682         /* get the vnnmap */
2683         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2684         if (ret != 0) {
2685                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2686                 return;
2687         }
2688
2689         if (rec->need_recovery) {
2690                 /* a previous recovery didn't finish */
2691                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2692                 return;
2693         }
2694
2695         /* verify that all active nodes are in normal mode 
2696            and not in recovery mode 
2697         */
2698         switch (verify_recmode(ctdb, nodemap)) {
2699         case MONITOR_RECOVERY_NEEDED:
2700                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2701                 return;
2702         case MONITOR_FAILED:
2703                 return;
2704         case MONITOR_ELECTION_NEEDED:
2705                 /* can not happen */
2706         case MONITOR_OK:
2707                 break;
2708         }
2709
2710
2711         if (ctdb->recovery_lock != NULL) {
2712                 /* We must already hold the recovery lock */
2713                 if (!ctdb_recovery_have_lock(rec)) {
2714                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2715                         ctdb_set_culprit(rec, ctdb->pnn);
2716                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2717                         return;
2718                 }
2719         }
2720
2721
2722         /* If recoveries are disabled then there is no use doing any
2723          * nodemap or flags checks.  Recoveries might be disabled due
2724          * to "reloadnodes", so doing these checks might cause an
2725          * unnecessary recovery.  */
2726         if (ctdb_op_is_disabled(rec->recovery)) {
2727                 goto takeover_run_checks;
2728         }
2729
2730         /* get the nodemap for all active remote nodes
2731          */
2732         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
2733         if (remote_nodemaps == NULL) {
2734                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2735                 return;
2736         }
2737         for(i=0; i<nodemap->num; i++) {
2738                 remote_nodemaps[i] = NULL;
2739         }
2740         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2741                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2742                 return;
2743         } 
2744
2745         /* verify that all other nodes have the same nodemap as we have
2746         */
2747         for (j=0; j<nodemap->num; j++) {
2748                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2749                         continue;
2750                 }
2751
2752                 if (remote_nodemaps[j] == NULL) {
2753                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2754                         ctdb_set_culprit(rec, j);
2755
2756                         return;
2757                 }
2758
2759                 /* if the nodes disagree on how many nodes there are
2760                    then this is a good reason to try recovery
2761                  */
2762                 if (remote_nodemaps[j]->num != nodemap->num) {
2763                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2764                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2765                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2766                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2767                         return;
2768                 }
2769
2770                 /* if the nodes disagree on which nodes exist and are
2771                    active, then that is also a good reason to do recovery
2772                  */
2773                 for (i=0;i<nodemap->num;i++) {
2774                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2775                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2776                                           nodemap->nodes[j].pnn, i, 
2777                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2778                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2779                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2780                                             vnnmap);
2781                                 return;
2782                         }
2783                 }
2784         }
2785
2786         /*
2787          * Update node flags obtained from each active node. This ensure we have
2788          * up-to-date information for all the nodes.
2789          */
2790         for (j=0; j<nodemap->num; j++) {
2791                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2792                         continue;
2793                 }
2794                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2795         }
2796
2797         for (j=0; j<nodemap->num; j++) {
2798                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2799                         continue;
2800                 }
2801
2802                 /* verify the flags are consistent
2803                 */
2804                 for (i=0; i<nodemap->num; i++) {
2805                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2806                                 continue;
2807                         }
2808                         
2809                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2810                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2811                                   nodemap->nodes[j].pnn, 
2812                                   nodemap->nodes[i].pnn, 
2813                                   remote_nodemaps[j]->nodes[i].flags,
2814                                   nodemap->nodes[i].flags));
2815                                 if (i == j) {
2816                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2817                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2818                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2819                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2820                                                     vnnmap);
2821                                         return;
2822                                 } else {
2823                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2824                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2825                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2826                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2827                                                     vnnmap);
2828                                         return;
2829                                 }
2830                         }
2831                 }
2832         }
2833
2834
2835         /* count how many active nodes there are */
2836         num_lmasters  = 0;
2837         for (i=0; i<nodemap->num; i++) {
2838                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2839                         if (ctdb_node_has_capabilities(rec->caps,
2840                                                        ctdb->nodes[i]->pnn,
2841                                                        CTDB_CAP_LMASTER)) {
2842                                 num_lmasters++;
2843                         }
2844                 }
2845         }
2846
2847
2848         /* There must be the same number of lmasters in the vnn map as
2849          * there are active nodes with the lmaster capability...  or
2850          * do a recovery.
2851          */
2852         if (vnnmap->size != num_lmasters) {
2853                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2854                           vnnmap->size, num_lmasters));
2855                 ctdb_set_culprit(rec, ctdb->pnn);
2856                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2857                 return;
2858         }
2859
2860         /* verify that all active nodes in the nodemap also exist in 
2861            the vnnmap.
2862          */
2863         for (j=0; j<nodemap->num; j++) {
2864                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2865                         continue;
2866                 }
2867                 if (nodemap->nodes[j].pnn == pnn) {
2868                         continue;
2869                 }
2870
2871                 for (i=0; i<vnnmap->size; i++) {
2872                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2873                                 break;
2874                         }
2875                 }
2876                 if (i == vnnmap->size) {
2877                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2878                                   nodemap->nodes[j].pnn));
2879                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2880                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2881                         return;
2882                 }
2883         }
2884
2885         
2886         /* verify that all other nodes have the same vnnmap
2887            and are from the same generation
2888          */
2889         for (j=0; j<nodemap->num; j++) {
2890                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2891                         continue;
2892                 }
2893                 if (nodemap->nodes[j].pnn == pnn) {
2894                         continue;
2895                 }
2896
2897                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2898                                           mem_ctx, &remote_vnnmap);
2899                 if (ret != 0) {
2900                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2901                                   nodemap->nodes[j].pnn));
2902                         return;
2903                 }
2904
2905                 /* verify the vnnmap generation is the same */
2906                 if (vnnmap->generation != remote_vnnmap->generation) {
2907                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2908                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2909                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2910                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2911                         return;
2912                 }
2913
2914                 /* verify the vnnmap size is the same */
2915                 if (vnnmap->size != remote_vnnmap->size) {
2916                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2917                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2918                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2919                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2920                         return;
2921                 }
2922
2923                 /* verify the vnnmap is the same */
2924                 for (i=0;i<vnnmap->size;i++) {
2925                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2926                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2927                                           nodemap->nodes[j].pnn));
2928                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2929                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2930                                             vnnmap);
2931                                 return;
2932                         }
2933                 }
2934         }
2935
2936         /* FIXME: Add remote public IP checking to ensure that nodes
2937          * have the IP addresses that are allocated to them. */
2938
2939 takeover_run_checks:
2940
2941         /* If there are IP takeover runs requested or the previous one
2942          * failed then perform one and notify the waiters */
2943         if (!ctdb_op_is_disabled(rec->takeover_run) &&
2944             (rec->reallocate_requests || rec->need_takeover_run)) {
2945                 process_ipreallocate_requests(ctdb, rec);
2946         }
2947 }
2948
2949 static void recd_sig_term_handler(struct tevent_context *ev,
2950                                   struct tevent_signal *se, int signum,
2951                                   int count, void *dont_care,
2952                                   void *private_data)
2953 {
2954         struct ctdb_recoverd *rec = talloc_get_type_abort(
2955                 private_data, struct ctdb_recoverd);
2956
2957         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
2958         ctdb_recovery_unlock(rec);
2959         exit(0);
2960 }
2961
2962
2963 /*
2964   the main monitoring loop
2965  */
2966 static void monitor_cluster(struct ctdb_context *ctdb)
2967 {
2968         struct tevent_signal *se;
2969         struct ctdb_recoverd *rec;
2970
2971         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2972
2973         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2974         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2975
2976         rec->ctdb = ctdb;
2977         rec->recmaster = CTDB_UNKNOWN_PNN;
2978         rec->recovery_lock_handle = NULL;
2979
2980         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
2981         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
2982
2983         rec->recovery = ctdb_op_init(rec, "recoveries");
2984         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
2985
2986         rec->priority_time = timeval_current();
2987         rec->frozen_on_inactive = false;
2988
2989         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
2990                                recd_sig_term_handler, rec);
2991         if (se == NULL) {
2992                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
2993                 exit(1);
2994         }
2995
2996         /* register a message port for sending memory dumps */
2997         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2998
2999         /* when a node is assigned banning credits */
3000         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3001                                         banning_handler, rec);
3002
3003         /* register a message port for recovery elections */
3004         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3005
3006         /* when nodes are disabled/enabled */
3007         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3008
3009         /* when we are asked to puch out a flag change */
3010         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3011
3012         /* register a message port for vacuum fetch */
3013         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3014
3015         /* register a message port for reloadnodes  */
3016         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3017
3018         /* register a message port for performing a takeover run */
3019         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3020
3021         /* register a message port for disabling the ip check for a short while */
3022         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3023
3024         /* register a message port for forcing a rebalance of a node next
3025            reallocation */
3026         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3027
3028         /* Register a message port for disabling takeover runs */
3029         ctdb_client_set_message_handler(ctdb,
3030                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3031                                         disable_takeover_runs_handler, rec);
3032
3033         /* Register a message port for disabling recoveries */
3034         ctdb_client_set_message_handler(ctdb,
3035                                         CTDB_SRVID_DISABLE_RECOVERIES,
3036                                         disable_recoveries_handler, rec);
3037
3038         /* register a message port for detaching database */
3039         ctdb_client_set_message_handler(ctdb,
3040                                         CTDB_SRVID_DETACH_DATABASE,
3041                                         detach_database_handler, rec);
3042
3043         for (;;) {
3044                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3045                 struct timeval start;
3046                 double elapsed;
3047
3048                 if (!mem_ctx) {
3049                         DEBUG(DEBUG_CRIT,(__location__
3050                                           " Failed to create temp context\n"));
3051                         exit(-1);
3052                 }
3053
3054                 start = timeval_current();
3055                 main_loop(ctdb, rec, mem_ctx);
3056                 talloc_free(mem_ctx);
3057
3058                 /* we only check for recovery once every second */
3059                 elapsed = timeval_elapsed(&start);
3060                 if (elapsed < ctdb->tunable.recover_interval) {
3061                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3062                                           - elapsed);
3063                 }
3064         }
3065 }
3066
3067 /*
3068   event handler for when the main ctdbd dies
3069  */
3070 static void ctdb_recoverd_parent(struct tevent_context *ev,
3071                                  struct tevent_fd *fde,
3072                                  uint16_t flags, void *private_data)
3073 {
3074         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3075         _exit(1);
3076 }
3077
3078 /*
3079   called regularly to verify that the recovery daemon is still running
3080  */
3081 static void ctdb_check_recd(struct tevent_context *ev,
3082                             struct tevent_timer *te,
3083                             struct timeval yt, void *p)
3084 {
3085         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3086
3087         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3088                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3089
3090                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3091                                  ctdb_restart_recd, ctdb);
3092
3093                 return;
3094         }
3095
3096         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3097                          timeval_current_ofs(30, 0),
3098                          ctdb_check_recd, ctdb);
3099 }
3100
3101 static void recd_sig_child_handler(struct tevent_context *ev,
3102                                    struct tevent_signal *se, int signum,
3103                                    int count, void *dont_care,
3104                                    void *private_data)
3105 {
3106 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3107         int status;
3108         pid_t pid = -1;
3109
3110         while (pid != 0) {
3111                 pid = waitpid(-1, &status, WNOHANG);
3112                 if (pid == -1) {
3113                         if (errno != ECHILD) {
3114                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3115                         }
3116                         return;
3117                 }
3118                 if (pid > 0) {
3119                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3120                 }
3121         }
3122 }
3123
3124 /*
3125   startup the recovery daemon as a child of the main ctdb daemon
3126  */
3127 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3128 {
3129         int fd[2];
3130         struct tevent_signal *se;
3131         struct tevent_fd *fde;
3132         int ret;
3133
3134         if (pipe(fd) != 0) {
3135                 return -1;
3136         }
3137
3138         ctdb->recoverd_pid = ctdb_fork(ctdb);
3139         if (ctdb->recoverd_pid == -1) {
3140                 return -1;
3141         }
3142
3143         if (ctdb->recoverd_pid != 0) {
3144                 talloc_free(ctdb->recd_ctx);
3145                 ctdb->recd_ctx = talloc_new(ctdb);
3146                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3147
3148                 close(fd[0]);
3149                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3150                                  timeval_current_ofs(30, 0),
3151                                  ctdb_check_recd, ctdb);
3152                 return 0;
3153         }
3154
3155         close(fd[1]);
3156
3157         srandom(getpid() ^ time(NULL));
3158
3159         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3160         if (ret != 0) {
3161                 return -1;
3162         }
3163
3164         prctl_set_comment("ctdb_recovered");
3165         if (switch_from_server_to_client(ctdb) != 0) {
3166                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3167                 exit(1);
3168         }
3169
3170         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3171
3172         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3173                             ctdb_recoverd_parent, &fd[0]);
3174         tevent_fd_set_auto_close(fde);
3175
3176         /* set up a handler to pick up sigchld */
3177         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3178                                recd_sig_child_handler, ctdb);
3179         if (se == NULL) {
3180                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3181                 exit(1);
3182         }
3183
3184         monitor_cluster(ctdb);
3185
3186         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3187         return -1;
3188 }
3189
3190 /*
3191   shutdown the recovery daemon
3192  */
3193 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3194 {
3195         if (ctdb->recoverd_pid == 0) {
3196                 return;
3197         }
3198
3199         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3200         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3201
3202         TALLOC_FREE(ctdb->recd_ctx);
3203         TALLOC_FREE(ctdb->recd_ping_count);
3204 }
3205
3206 static void ctdb_restart_recd(struct tevent_context *ev,
3207                               struct tevent_timer *te,
3208                               struct timeval t, void *private_data)
3209 {
3210         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3211
3212         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3213         ctdb_stop_recoverd(ctdb);
3214         ctdb_start_recoverd(ctdb);
3215 }