ctdb-config: Switch tunable DisableIPFailover to a config option
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system_socket.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "server/ctdb_config.h"
46
47 #include "ctdb_cluster_mutex.h"
48
49 /* List of SRVID requests that need to be processed */
50 struct srvid_list {
51         struct srvid_list *next, *prev;
52         struct ctdb_srvid_message *request;
53 };
54
55 struct srvid_requests {
56         struct srvid_list *requests;
57 };
58
59 static void srvid_request_reply(struct ctdb_context *ctdb,
60                                 struct ctdb_srvid_message *request,
61                                 TDB_DATA result)
62 {
63         /* Someone that sent srvid==0 does not want a reply */
64         if (request->srvid == 0) {
65                 talloc_free(request);
66                 return;
67         }
68
69         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
70                                      result) == 0) {
71                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
72                                   (unsigned)request->pnn,
73                                   (unsigned long long)request->srvid));
74         } else {
75                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
76                                  (unsigned)request->pnn,
77                                  (unsigned long long)request->srvid));
78         }
79
80         talloc_free(request);
81 }
82
83 static void srvid_requests_reply(struct ctdb_context *ctdb,
84                                  struct srvid_requests **requests,
85                                  TDB_DATA result)
86 {
87         struct srvid_list *r;
88
89         if (*requests == NULL) {
90                 return;
91         }
92
93         for (r = (*requests)->requests; r != NULL; r = r->next) {
94                 srvid_request_reply(ctdb, r->request, result);
95         }
96
97         /* Free the list structure... */
98         TALLOC_FREE(*requests);
99 }
100
101 static void srvid_request_add(struct ctdb_context *ctdb,
102                               struct srvid_requests **requests,
103                               struct ctdb_srvid_message *request)
104 {
105         struct srvid_list *t;
106         int32_t ret;
107         TDB_DATA result;
108
109         if (*requests == NULL) {
110                 *requests = talloc_zero(ctdb, struct srvid_requests);
111                 if (*requests == NULL) {
112                         goto nomem;
113                 }
114         }
115
116         t = talloc_zero(*requests, struct srvid_list);
117         if (t == NULL) {
118                 /* If *requests was just allocated above then free it */
119                 if ((*requests)->requests == NULL) {
120                         TALLOC_FREE(*requests);
121                 }
122                 goto nomem;
123         }
124
125         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
126         DLIST_ADD((*requests)->requests, t);
127
128         return;
129
130 nomem:
131         /* Failed to add the request to the list.  Send a fail. */
132         DEBUG(DEBUG_ERR, (__location__
133                           " Out of memory, failed to queue SRVID request\n"));
134         ret = -ENOMEM;
135         result.dsize = sizeof(ret);
136         result.dptr = (uint8_t *)&ret;
137         srvid_request_reply(ctdb, request, result);
138 }
139
140 /* An abstraction to allow an operation (takeover runs, recoveries,
141  * ...) to be disabled for a given timeout */
142 struct ctdb_op_state {
143         struct tevent_timer *timer;
144         bool in_progress;
145         const char *name;
146 };
147
148 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
149 {
150         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
151
152         if (state != NULL) {
153                 state->in_progress = false;
154                 state->name = name;
155         }
156
157         return state;
158 }
159
160 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
161 {
162         return state->timer != NULL;
163 }
164
165 static bool ctdb_op_begin(struct ctdb_op_state *state)
166 {
167         if (ctdb_op_is_disabled(state)) {
168                 DEBUG(DEBUG_NOTICE,
169                       ("Unable to begin - %s are disabled\n", state->name));
170                 return false;
171         }
172
173         state->in_progress = true;
174         return true;
175 }
176
177 static bool ctdb_op_end(struct ctdb_op_state *state)
178 {
179         return state->in_progress = false;
180 }
181
182 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
183 {
184         return state->in_progress;
185 }
186
187 static void ctdb_op_enable(struct ctdb_op_state *state)
188 {
189         TALLOC_FREE(state->timer);
190 }
191
192 static void ctdb_op_timeout_handler(struct tevent_context *ev,
193                                     struct tevent_timer *te,
194                                     struct timeval yt, void *p)
195 {
196         struct ctdb_op_state *state =
197                 talloc_get_type(p, struct ctdb_op_state);
198
199         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
200         ctdb_op_enable(state);
201 }
202
203 static int ctdb_op_disable(struct ctdb_op_state *state,
204                            struct tevent_context *ev,
205                            uint32_t timeout)
206 {
207         if (timeout == 0) {
208                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
209                 ctdb_op_enable(state);
210                 return 0;
211         }
212
213         if (state->in_progress) {
214                 DEBUG(DEBUG_ERR,
215                       ("Unable to disable %s - in progress\n", state->name));
216                 return -EAGAIN;
217         }
218
219         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
220                             state->name, timeout));
221
222         /* Clear any old timers */
223         talloc_free(state->timer);
224
225         /* Arrange for the timeout to occur */
226         state->timer = tevent_add_timer(ev, state,
227                                         timeval_current_ofs(timeout, 0),
228                                         ctdb_op_timeout_handler, state);
229         if (state->timer == NULL) {
230                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
231                 return -ENOMEM;
232         }
233
234         return 0;
235 }
236
237 struct ctdb_banning_state {
238         uint32_t count;
239         struct timeval last_reported_time;
240 };
241
242 /*
243   private state of recovery daemon
244  */
245 struct ctdb_recoverd {
246         struct ctdb_context *ctdb;
247         uint32_t recmaster;
248         uint32_t last_culprit_node;
249         struct ctdb_node_map_old *nodemap;
250         struct timeval priority_time;
251         bool need_takeover_run;
252         bool need_recovery;
253         uint32_t node_flags;
254         struct tevent_timer *send_election_te;
255         struct tevent_timer *election_timeout;
256         struct srvid_requests *reallocate_requests;
257         struct ctdb_op_state *takeover_run;
258         struct ctdb_op_state *recovery;
259         struct ctdb_iface_list_old *ifaces;
260         uint32_t *force_rebalance_nodes;
261         struct ctdb_node_capabilities *caps;
262         bool frozen_on_inactive;
263         struct ctdb_cluster_mutex_handle *recovery_lock_handle;
264 };
265
266 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
267 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
268
269 static void ctdb_restart_recd(struct tevent_context *ev,
270                               struct tevent_timer *te, struct timeval t,
271                               void *private_data);
272
273 /*
274   ban a node for a period of time
275  */
276 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
277 {
278         int ret;
279         struct ctdb_context *ctdb = rec->ctdb;
280         struct ctdb_ban_state bantime;
281
282         if (!ctdb_validate_pnn(ctdb, pnn)) {
283                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
284                 return;
285         }
286
287         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
288
289         bantime.pnn  = pnn;
290         bantime.time = ban_time;
291
292         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
293         if (ret != 0) {
294                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
295                 return;
296         }
297
298 }
299
300 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
301
302
303 /*
304   remember the trouble maker
305  */
306 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
307 {
308         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
309         struct ctdb_banning_state *ban_state;
310
311         if (culprit > ctdb->num_nodes) {
312                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
313                 return;
314         }
315
316         /* If we are banned or stopped, do not set other nodes as culprits */
317         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
318                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
319                 return;
320         }
321
322         if (ctdb->nodes[culprit]->ban_state == NULL) {
323                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
324                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
325
326                 
327         }
328         ban_state = ctdb->nodes[culprit]->ban_state;
329         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
330                 /* this was the first time in a long while this node
331                    misbehaved so we will forgive any old transgressions.
332                 */
333                 ban_state->count = 0;
334         }
335
336         ban_state->count += count;
337         ban_state->last_reported_time = timeval_current();
338         rec->last_culprit_node = culprit;
339 }
340
341 /*
342   remember the trouble maker
343  */
344 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
345 {
346         ctdb_set_culprit_count(rec, culprit, 1);
347 }
348
349 /*
350   Retrieve capabilities from all connected nodes
351  */
352 static int update_capabilities(struct ctdb_recoverd *rec,
353                                struct ctdb_node_map_old *nodemap)
354 {
355         uint32_t *capp;
356         TALLOC_CTX *tmp_ctx;
357         struct ctdb_node_capabilities *caps;
358         struct ctdb_context *ctdb = rec->ctdb;
359
360         tmp_ctx = talloc_new(rec);
361         CTDB_NO_MEMORY(ctdb, tmp_ctx);
362
363         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
364                                      CONTROL_TIMEOUT(), nodemap);
365
366         if (caps == NULL) {
367                 DEBUG(DEBUG_ERR,
368                       (__location__ " Failed to get node capabilities\n"));
369                 talloc_free(tmp_ctx);
370                 return -1;
371         }
372
373         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
374         if (capp == NULL) {
375                 DEBUG(DEBUG_ERR,
376                       (__location__
377                        " Capabilities don't include current node.\n"));
378                 talloc_free(tmp_ctx);
379                 return -1;
380         }
381         ctdb->capabilities = *capp;
382
383         TALLOC_FREE(rec->caps);
384         rec->caps = talloc_steal(rec, caps);
385
386         talloc_free(tmp_ctx);
387         return 0;
388 }
389
390 /*
391   change recovery mode on all nodes
392  */
393 static int set_recovery_mode(struct ctdb_context *ctdb,
394                              struct ctdb_recoverd *rec,
395                              struct ctdb_node_map_old *nodemap,
396                              uint32_t rec_mode)
397 {
398         TDB_DATA data;
399         uint32_t *nodes;
400         TALLOC_CTX *tmp_ctx;
401
402         tmp_ctx = talloc_new(ctdb);
403         CTDB_NO_MEMORY(ctdb, tmp_ctx);
404
405         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
406
407         data.dsize = sizeof(uint32_t);
408         data.dptr = (unsigned char *)&rec_mode;
409
410         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
411                                         nodes, 0,
412                                         CONTROL_TIMEOUT(),
413                                         false, data,
414                                         NULL, NULL,
415                                         NULL) != 0) {
416                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
417                 talloc_free(tmp_ctx);
418                 return -1;
419         }
420
421         talloc_free(tmp_ctx);
422         return 0;
423 }
424
425 /*
426   ensure all other nodes have attached to any databases that we have
427  */
428 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
429                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
430 {
431         int i, j, db, ret;
432         struct ctdb_dbid_map_old *remote_dbmap;
433
434         /* verify that all other nodes have all our databases */
435         for (j=0; j<nodemap->num; j++) {
436                 /* we don't need to ourself ourselves */
437                 if (nodemap->nodes[j].pnn == pnn) {
438                         continue;
439                 }
440                 /* don't check nodes that are unavailable */
441                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
442                         continue;
443                 }
444
445                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
446                                          mem_ctx, &remote_dbmap);
447                 if (ret != 0) {
448                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
449                         return -1;
450                 }
451
452                 /* step through all local databases */
453                 for (db=0; db<dbmap->num;db++) {
454                         const char *name;
455
456
457                         for (i=0;i<remote_dbmap->num;i++) {
458                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
459                                         break;
460                                 }
461                         }
462                         /* the remote node already have this database */
463                         if (i!=remote_dbmap->num) {
464                                 continue;
465                         }
466                         /* ok so we need to create this database */
467                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
468                                                   dbmap->dbs[db].db_id, mem_ctx,
469                                                   &name);
470                         if (ret != 0) {
471                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
472                                 return -1;
473                         }
474                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
475                                                  nodemap->nodes[j].pnn,
476                                                  mem_ctx, name,
477                                                  dbmap->dbs[db].flags, NULL);
478                         if (ret != 0) {
479                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
480                                 return -1;
481                         }
482                 }
483         }
484
485         return 0;
486 }
487
488
489 /*
490   ensure we are attached to any databases that anyone else is attached to
491  */
492 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
493                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
494 {
495         int i, j, db, ret;
496         struct ctdb_dbid_map_old *remote_dbmap;
497
498         /* verify that we have all database any other node has */
499         for (j=0; j<nodemap->num; j++) {
500                 /* we don't need to ourself ourselves */
501                 if (nodemap->nodes[j].pnn == pnn) {
502                         continue;
503                 }
504                 /* don't check nodes that are unavailable */
505                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
506                         continue;
507                 }
508
509                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
510                                          mem_ctx, &remote_dbmap);
511                 if (ret != 0) {
512                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
513                         return -1;
514                 }
515
516                 /* step through all databases on the remote node */
517                 for (db=0; db<remote_dbmap->num;db++) {
518                         const char *name;
519
520                         for (i=0;i<(*dbmap)->num;i++) {
521                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
522                                         break;
523                                 }
524                         }
525                         /* we already have this db locally */
526                         if (i!=(*dbmap)->num) {
527                                 continue;
528                         }
529                         /* ok so we need to create this database and
530                            rebuild dbmap
531                          */
532                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
533                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
534                         if (ret != 0) {
535                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
536                                           nodemap->nodes[j].pnn));
537                                 return -1;
538                         }
539                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
540                                            mem_ctx, name,
541                                            remote_dbmap->dbs[db].flags, NULL);
542                         if (ret != 0) {
543                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
544                                 return -1;
545                         }
546                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
547                         if (ret != 0) {
548                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
549                                 return -1;
550                         }
551                 }
552         }
553
554         return 0;
555 }
556
557 /*
558   update flags on all active nodes
559  */
560 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
561 {
562         int ret;
563
564         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
565                 if (ret != 0) {
566                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
567                 return -1;
568         }
569
570         return 0;
571 }
572
573 /*
574   called when a vacuum fetch has completed - just free it and do the next one
575  */
576 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
577 {
578         talloc_free(state);
579 }
580
581
582 /**
583  * Process one elements of the vacuum fetch list:
584  * Migrate it over to us with the special flag
585  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
586  */
587 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
588                                      uint32_t pnn,
589                                      struct ctdb_rec_data_old *r)
590 {
591         struct ctdb_client_call_state *state;
592         TDB_DATA data;
593         struct ctdb_ltdb_header *hdr;
594         struct ctdb_call call;
595
596         ZERO_STRUCT(call);
597         call.call_id = CTDB_NULL_FUNC;
598         call.flags = CTDB_IMMEDIATE_MIGRATION;
599         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
600
601         call.key.dptr = &r->data[0];
602         call.key.dsize = r->keylen;
603
604         /* ensure we don't block this daemon - just skip a record if we can't get
605            the chainlock */
606         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
607                 return true;
608         }
609
610         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
611         if (data.dptr == NULL) {
612                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
613                 return true;
614         }
615
616         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
617                 free(data.dptr);
618                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
619                 return true;
620         }
621
622         hdr = (struct ctdb_ltdb_header *)data.dptr;
623         if (hdr->dmaster == pnn) {
624                 /* its already local */
625                 free(data.dptr);
626                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
627                 return true;
628         }
629
630         free(data.dptr);
631
632         state = ctdb_call_send(ctdb_db, &call);
633         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
634         if (state == NULL) {
635                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
636                 return false;
637         }
638         state->async.fn = vacuum_fetch_callback;
639         state->async.private_data = NULL;
640
641         return true;
642 }
643
644
645 /*
646   handler for vacuum fetch
647 */
648 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
649                                  void *private_data)
650 {
651         struct ctdb_recoverd *rec = talloc_get_type(
652                 private_data, struct ctdb_recoverd);
653         struct ctdb_context *ctdb = rec->ctdb;
654         struct ctdb_marshall_buffer *recs;
655         int ret, i;
656         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
657         const char *name;
658         struct ctdb_dbid_map_old *dbmap=NULL;
659         uint8_t db_flags = 0;
660         struct ctdb_db_context *ctdb_db;
661         struct ctdb_rec_data_old *r;
662
663         recs = (struct ctdb_marshall_buffer *)data.dptr;
664
665         if (recs->count == 0) {
666                 goto done;
667         }
668
669         /* work out if the database is persistent */
670         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
671         if (ret != 0) {
672                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
673                 goto done;
674         }
675
676         for (i=0;i<dbmap->num;i++) {
677                 if (dbmap->dbs[i].db_id == recs->db_id) {
678                         db_flags = dbmap->dbs[i].flags;
679                         break;
680                 }
681         }
682         if (i == dbmap->num) {
683                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
684                 goto done;
685         }
686
687         /* find the name of this database */
688         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
689                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
690                 goto done;
691         }
692
693         /* attach to it */
694         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
695         if (ctdb_db == NULL) {
696                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
697                 goto done;
698         }
699
700         r = (struct ctdb_rec_data_old *)&recs->data[0];
701         while (recs->count) {
702                 bool ok;
703
704                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
705                 if (!ok) {
706                         break;
707                 }
708
709                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
710                 recs->count--;
711         }
712
713 done:
714         talloc_free(tmp_ctx);
715 }
716
717
718 /*
719  * handler for database detach
720  */
721 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
722                                     void *private_data)
723 {
724         struct ctdb_recoverd *rec = talloc_get_type(
725                 private_data, struct ctdb_recoverd);
726         struct ctdb_context *ctdb = rec->ctdb;
727         uint32_t db_id;
728         struct ctdb_db_context *ctdb_db;
729
730         if (data.dsize != sizeof(db_id)) {
731                 return;
732         }
733         db_id = *(uint32_t *)data.dptr;
734
735         ctdb_db = find_ctdb_db(ctdb, db_id);
736         if (ctdb_db == NULL) {
737                 /* database is not attached */
738                 return;
739         }
740
741         DLIST_REMOVE(ctdb->db_list, ctdb_db);
742
743         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
744                              ctdb_db->db_name));
745         talloc_free(ctdb_db);
746 }
747
748 /*
749   called when ctdb_wait_timeout should finish
750  */
751 static void ctdb_wait_handler(struct tevent_context *ev,
752                               struct tevent_timer *te,
753                               struct timeval yt, void *p)
754 {
755         uint32_t *timed_out = (uint32_t *)p;
756         (*timed_out) = 1;
757 }
758
759 /*
760   wait for a given number of seconds
761  */
762 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
763 {
764         uint32_t timed_out = 0;
765         time_t usecs = (secs - (time_t)secs) * 1000000;
766         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
767                          ctdb_wait_handler, &timed_out);
768         while (!timed_out) {
769                 tevent_loop_once(ctdb->ev);
770         }
771 }
772
773 /*
774   called when an election times out (ends)
775  */
776 static void ctdb_election_timeout(struct tevent_context *ev,
777                                   struct tevent_timer *te,
778                                   struct timeval t, void *p)
779 {
780         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
781         rec->election_timeout = NULL;
782         fast_start = false;
783
784         DEBUG(DEBUG_WARNING,("Election period ended\n"));
785 }
786
787
788 /*
789   wait for an election to finish. It finished election_timeout seconds after
790   the last election packet is received
791  */
792 static void ctdb_wait_election(struct ctdb_recoverd *rec)
793 {
794         struct ctdb_context *ctdb = rec->ctdb;
795         while (rec->election_timeout) {
796                 tevent_loop_once(ctdb->ev);
797         }
798 }
799
800 /*
801   Update our local flags from all remote connected nodes. 
802   This is only run when we are or we belive we are the recovery master
803  */
804 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
805 {
806         int j;
807         struct ctdb_context *ctdb = rec->ctdb;
808         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
809
810         /* get the nodemap for all active remote nodes and verify
811            they are the same as for this node
812          */
813         for (j=0; j<nodemap->num; j++) {
814                 struct ctdb_node_map_old *remote_nodemap=NULL;
815                 int ret;
816
817                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
818                         continue;
819                 }
820                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
821                         continue;
822                 }
823
824                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
825                                            mem_ctx, &remote_nodemap);
826                 if (ret != 0) {
827                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
828                                   nodemap->nodes[j].pnn));
829                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
830                         talloc_free(mem_ctx);
831                         return -1;
832                 }
833                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
834                         /* We should tell our daemon about this so it
835                            updates its flags or else we will log the same 
836                            message again in the next iteration of recovery.
837                            Since we are the recovery master we can just as
838                            well update the flags on all nodes.
839                         */
840                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
841                         if (ret != 0) {
842                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
843                                 return -1;
844                         }
845
846                         /* Update our local copy of the flags in the recovery
847                            daemon.
848                         */
849                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
850                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
851                                  nodemap->nodes[j].flags));
852                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
853                 }
854                 talloc_free(remote_nodemap);
855         }
856         talloc_free(mem_ctx);
857         return 0;
858 }
859
860
861 /* Create a new random generation id.
862    The generation id can not be the INVALID_GENERATION id
863 */
864 static uint32_t new_generation(void)
865 {
866         uint32_t generation;
867
868         while (1) {
869                 generation = random();
870
871                 if (generation != INVALID_GENERATION) {
872                         break;
873                 }
874         }
875
876         return generation;
877 }
878
879 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
880 {
881         return (rec->recovery_lock_handle != NULL);
882 }
883
884 struct hold_reclock_state {
885         bool done;
886         bool locked;
887         double latency;
888 };
889
890 static void take_reclock_handler(char status,
891                                  double latency,
892                                  void *private_data)
893 {
894         struct hold_reclock_state *s =
895                 (struct hold_reclock_state *) private_data;
896
897         switch (status) {
898         case '0':
899                 s->latency = latency;
900                 break;
901
902         case '1':
903                 DEBUG(DEBUG_ERR,
904                       ("Unable to take recovery lock - contention\n"));
905                 break;
906
907         default:
908                 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
909         }
910
911         s->done = true;
912         s->locked = (status == '0') ;
913 }
914
915 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec);
916
917 static void lost_reclock_handler(void *private_data)
918 {
919         struct ctdb_recoverd *rec = talloc_get_type_abort(
920                 private_data, struct ctdb_recoverd);
921
922         DEBUG(DEBUG_ERR,
923               ("Recovery lock helper terminated unexpectedly - "
924                "trying to retake recovery lock\n"));
925         TALLOC_FREE(rec->recovery_lock_handle);
926         if (! ctdb_recovery_lock(rec)) {
927                 DEBUG(DEBUG_ERR, ("Failed to take recovery lock\n"));
928         }
929 }
930
931 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
932 {
933         struct ctdb_context *ctdb = rec->ctdb;
934         struct ctdb_cluster_mutex_handle *h;
935         struct hold_reclock_state s = {
936                 .done = false,
937                 .locked = false,
938                 .latency = 0,
939         };
940
941         h = ctdb_cluster_mutex(rec, ctdb, ctdb->recovery_lock, 0,
942                                take_reclock_handler, &s,
943                                lost_reclock_handler, rec);
944         if (h == NULL) {
945                 return false;
946         }
947
948         while (!s.done) {
949                 tevent_loop_once(ctdb->ev);
950         }
951
952         if (! s.locked) {
953                 talloc_free(h);
954                 return false;
955         }
956
957         rec->recovery_lock_handle = h;
958         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(),
959                                            s.latency);
960
961         return true;
962 }
963
964 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
965 {
966         if (rec->recovery_lock_handle != NULL) {
967                 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
968                 TALLOC_FREE(rec->recovery_lock_handle);
969         }
970 }
971
972 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
973 {
974         struct ctdb_context *ctdb = rec->ctdb;
975         int i;
976         struct ctdb_banning_state *ban_state;
977
978         *self_ban = false;
979         for (i=0; i<ctdb->num_nodes; i++) {
980                 if (ctdb->nodes[i]->ban_state == NULL) {
981                         continue;
982                 }
983                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
984                 if (ban_state->count < 2*ctdb->num_nodes) {
985                         continue;
986                 }
987
988                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
989                         ctdb->nodes[i]->pnn, ban_state->count,
990                         ctdb->tunable.recovery_ban_period));
991                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
992                 ban_state->count = 0;
993
994                 /* Banning ourself? */
995                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
996                         *self_ban = true;
997                 }
998         }
999 }
1000
1001 struct helper_state {
1002         int fd[2];
1003         pid_t pid;
1004         int result;
1005         bool done;
1006 };
1007
1008 static void helper_handler(struct tevent_context *ev,
1009                            struct tevent_fd *fde,
1010                            uint16_t flags, void *private_data)
1011 {
1012         struct helper_state *state = talloc_get_type_abort(
1013                 private_data, struct helper_state);
1014         int ret;
1015
1016         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1017         if (ret != sizeof(state->result)) {
1018                 state->result = EPIPE;
1019         }
1020
1021         state->done = true;
1022 }
1023
1024 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1025                       const char *prog, const char *arg, const char *type)
1026 {
1027         struct helper_state *state;
1028         struct tevent_fd *fde;
1029         const char **args;
1030         int nargs, ret;
1031         uint32_t recmaster = rec->recmaster;
1032
1033         state = talloc_zero(mem_ctx, struct helper_state);
1034         if (state == NULL) {
1035                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1036                 return -1;
1037         }
1038
1039         state->pid = -1;
1040
1041         ret = pipe(state->fd);
1042         if (ret != 0) {
1043                 DEBUG(DEBUG_ERR,
1044                       ("Failed to create pipe for %s helper\n", type));
1045                 goto fail;
1046         }
1047
1048         set_close_on_exec(state->fd[0]);
1049
1050         nargs = 4;
1051         args = talloc_array(state, const char *, nargs);
1052         if (args == NULL) {
1053                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1054                 goto fail;
1055         }
1056
1057         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1058         if (args[0] == NULL) {
1059                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1060                 goto fail;
1061         }
1062         args[1] = rec->ctdb->daemon.name;
1063         args[2] = arg;
1064         args[3] = NULL;
1065
1066         if (args[2] == NULL) {
1067                 nargs = 3;
1068         }
1069
1070         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
1071         if (state->pid == -1) {
1072                 DEBUG(DEBUG_ERR,
1073                       ("Failed to create child for %s helper\n", type));
1074                 goto fail;
1075         }
1076
1077         close(state->fd[1]);
1078         state->fd[1] = -1;
1079
1080         state->done = false;
1081
1082         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1083                             TEVENT_FD_READ, helper_handler, state);
1084         if (fde == NULL) {
1085                 goto fail;
1086         }
1087         tevent_fd_set_auto_close(fde);
1088
1089         while (!state->done) {
1090                 tevent_loop_once(rec->ctdb->ev);
1091
1092                 /* If recmaster changes, we have lost election */
1093                 if (recmaster != rec->recmaster) {
1094                         D_ERR("Recmaster changed to %u, aborting %s\n",
1095                               rec->recmaster, type);
1096                         state->result = 1;
1097                         break;
1098                 }
1099         }
1100
1101         close(state->fd[0]);
1102         state->fd[0] = -1;
1103
1104         if (state->result != 0) {
1105                 goto fail;
1106         }
1107
1108         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1109         talloc_free(state);
1110         return 0;
1111
1112 fail:
1113         if (state->fd[0] != -1) {
1114                 close(state->fd[0]);
1115         }
1116         if (state->fd[1] != -1) {
1117                 close(state->fd[1]);
1118         }
1119         if (state->pid != -1) {
1120                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1121         }
1122         talloc_free(state);
1123         return -1;
1124 }
1125
1126
1127 static int ctdb_takeover(struct ctdb_recoverd *rec,
1128                          uint32_t *force_rebalance_nodes)
1129 {
1130         static char prog[PATH_MAX+1] = "";
1131         char *arg;
1132         int i, ret;
1133
1134         if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
1135                              "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
1136                              "ctdb_takeover_helper")) {
1137                 ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
1138         }
1139
1140         arg = NULL;
1141         for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
1142                 uint32_t pnn = force_rebalance_nodes[i];
1143                 if (arg == NULL) {
1144                         arg = talloc_asprintf(rec, "%u", pnn);
1145                 } else {
1146                         arg = talloc_asprintf_append(arg, ",%u", pnn);
1147                 }
1148                 if (arg == NULL) {
1149                         DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1150                         return -1;
1151                 }
1152         }
1153
1154         if (ctdb_config.failover_disabled) {
1155                 ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
1156                 if (ret != 0) {
1157                         D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
1158                         return -1;
1159                 }
1160         }
1161
1162         return helper_run(rec, rec, prog, arg, "takeover");
1163 }
1164
1165 static bool do_takeover_run(struct ctdb_recoverd *rec,
1166                             struct ctdb_node_map_old *nodemap)
1167 {
1168         uint32_t *nodes = NULL;
1169         struct ctdb_disable_message dtr;
1170         TDB_DATA data;
1171         int i;
1172         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1173         int ret;
1174         bool ok;
1175
1176         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1177
1178         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1179                 DEBUG(DEBUG_ERR, (__location__
1180                                   " takeover run already in progress \n"));
1181                 ok = false;
1182                 goto done;
1183         }
1184
1185         if (!ctdb_op_begin(rec->takeover_run)) {
1186                 ok = false;
1187                 goto done;
1188         }
1189
1190         /* Disable IP checks (takeover runs, really) on other nodes
1191          * while doing this takeover run.  This will stop those other
1192          * nodes from triggering takeover runs when think they should
1193          * be hosting an IP but it isn't yet on an interface.  Don't
1194          * wait for replies since a failure here might cause some
1195          * noise in the logs but will not actually cause a problem.
1196          */
1197         ZERO_STRUCT(dtr);
1198         dtr.srvid = 0; /* No reply */
1199         dtr.pnn = -1;
1200
1201         data.dptr  = (uint8_t*)&dtr;
1202         data.dsize = sizeof(dtr);
1203
1204         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1205
1206         /* Disable for 60 seconds.  This can be a tunable later if
1207          * necessary.
1208          */
1209         dtr.timeout = 60;
1210         for (i = 0; i < talloc_array_length(nodes); i++) {
1211                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1212                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1213                                              data) != 0) {
1214                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1215                 }
1216         }
1217
1218         ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
1219
1220         /* Reenable takeover runs and IP checks on other nodes */
1221         dtr.timeout = 0;
1222         for (i = 0; i < talloc_array_length(nodes); i++) {
1223                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1224                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1225                                              data) != 0) {
1226                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1227                 }
1228         }
1229
1230         if (ret != 0) {
1231                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1232                 ok = false;
1233                 goto done;
1234         }
1235
1236         ok = true;
1237         /* Takeover run was successful so clear force rebalance targets */
1238         if (rebalance_nodes == rec->force_rebalance_nodes) {
1239                 TALLOC_FREE(rec->force_rebalance_nodes);
1240         } else {
1241                 DEBUG(DEBUG_WARNING,
1242                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1243         }
1244 done:
1245         rec->need_takeover_run = !ok;
1246         talloc_free(nodes);
1247         ctdb_op_end(rec->takeover_run);
1248
1249         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1250         return ok;
1251 }
1252
1253 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1254 {
1255         static char prog[PATH_MAX+1] = "";
1256         const char *arg;
1257
1258         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1259                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1260                              "ctdb_recovery_helper")) {
1261                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1262         }
1263
1264         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1265         if (arg == NULL) {
1266                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1267                 return -1;
1268         }
1269
1270         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1271
1272         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1273 }
1274
1275 /*
1276   we are the recmaster, and recovery is needed - start a recovery run
1277  */
1278 static int do_recovery(struct ctdb_recoverd *rec,
1279                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1280                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1281 {
1282         struct ctdb_context *ctdb = rec->ctdb;
1283         int i, ret;
1284         struct ctdb_dbid_map_old *dbmap;
1285         bool self_ban;
1286
1287         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1288
1289         /* Check if the current node is still the recmaster.  It's possible that
1290          * re-election has changed the recmaster.
1291          */
1292         if (pnn != rec->recmaster) {
1293                 DEBUG(DEBUG_NOTICE,
1294                       ("Recovery master changed to %u, aborting recovery\n",
1295                        rec->recmaster));
1296                 return -1;
1297         }
1298
1299         /* if recovery fails, force it again */
1300         rec->need_recovery = true;
1301
1302         if (!ctdb_op_begin(rec->recovery)) {
1303                 return -1;
1304         }
1305
1306         if (rec->election_timeout) {
1307                 /* an election is in progress */
1308                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1309                 goto fail;
1310         }
1311
1312         ban_misbehaving_nodes(rec, &self_ban);
1313         if (self_ban) {
1314                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1315                 goto fail;
1316         }
1317
1318         if (ctdb->recovery_lock != NULL) {
1319                 if (ctdb_recovery_have_lock(rec)) {
1320                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
1321                 } else {
1322                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
1323                                              ctdb->recovery_lock));
1324                         if (!ctdb_recovery_lock(rec)) {
1325                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1326                                         /* If ctdb is trying first recovery, it's
1327                                          * possible that current node does not know
1328                                          * yet who the recmaster is.
1329                                          */
1330                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
1331                                                           " - retrying recovery\n"));
1332                                         goto fail;
1333                                 }
1334
1335                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1336                                                  "and ban ourself for %u seconds\n",
1337                                                  ctdb->tunable.recovery_ban_period));
1338                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1339                                 goto fail;
1340                         }
1341                         DEBUG(DEBUG_NOTICE,
1342                               ("Recovery lock taken successfully by recovery daemon\n"));
1343                 }
1344         }
1345
1346         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1347
1348         /* get a list of all databases */
1349         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1350         if (ret != 0) {
1351                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1352                 goto fail;
1353         }
1354
1355         /* we do the db creation before we set the recovery mode, so the freeze happens
1356            on all databases we will be dealing with. */
1357
1358         /* verify that we have all the databases any other node has */
1359         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1360         if (ret != 0) {
1361                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1362                 goto fail;
1363         }
1364
1365         /* verify that all other nodes have all our databases */
1366         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1367         if (ret != 0) {
1368                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1369                 goto fail;
1370         }
1371         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1372
1373
1374         /* Retrieve capabilities from all connected nodes */
1375         ret = update_capabilities(rec, nodemap);
1376         if (ret!=0) {
1377                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1378                 return -1;
1379         }
1380
1381         /*
1382           update all nodes to have the same flags that we have
1383          */
1384         for (i=0;i<nodemap->num;i++) {
1385                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1386                         continue;
1387                 }
1388
1389                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1390                 if (ret != 0) {
1391                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1392                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1393                         } else {
1394                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1395                                 return -1;
1396                         }
1397                 }
1398         }
1399
1400         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1401
1402         ret = db_recovery_parallel(rec, mem_ctx);
1403         if (ret != 0) {
1404                 goto fail;
1405         }
1406
1407         do_takeover_run(rec, nodemap);
1408
1409         /* send a message to all clients telling them that the cluster 
1410            has been reconfigured */
1411         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1412                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1413         if (ret != 0) {
1414                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1415                 goto fail;
1416         }
1417
1418         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1419
1420         rec->need_recovery = false;
1421         ctdb_op_end(rec->recovery);
1422
1423         /* we managed to complete a full recovery, make sure to forgive
1424            any past sins by the nodes that could now participate in the
1425            recovery.
1426         */
1427         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1428         for (i=0;i<nodemap->num;i++) {
1429                 struct ctdb_banning_state *ban_state;
1430
1431                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1432                         continue;
1433                 }
1434
1435                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1436                 if (ban_state == NULL) {
1437                         continue;
1438                 }
1439
1440                 ban_state->count = 0;
1441         }
1442
1443         /* We just finished a recovery successfully.
1444            We now wait for rerecovery_timeout before we allow
1445            another recovery to take place.
1446         */
1447         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1448         ctdb_op_disable(rec->recovery, ctdb->ev,
1449                         ctdb->tunable.rerecovery_timeout);
1450         return 0;
1451
1452 fail:
1453         ctdb_op_end(rec->recovery);
1454         return -1;
1455 }
1456
1457
1458 /*
1459   elections are won by first checking the number of connected nodes, then
1460   the priority time, then the pnn
1461  */
1462 struct election_message {
1463         uint32_t num_connected;
1464         struct timeval priority_time;
1465         uint32_t pnn;
1466         uint32_t node_flags;
1467 };
1468
1469 /*
1470   form this nodes election data
1471  */
1472 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1473 {
1474         int ret, i;
1475         struct ctdb_node_map_old *nodemap;
1476         struct ctdb_context *ctdb = rec->ctdb;
1477
1478         ZERO_STRUCTP(em);
1479
1480         em->pnn = rec->ctdb->pnn;
1481         em->priority_time = rec->priority_time;
1482
1483         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1484         if (ret != 0) {
1485                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1486                 return;
1487         }
1488
1489         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1490         em->node_flags = rec->node_flags;
1491
1492         for (i=0;i<nodemap->num;i++) {
1493                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1494                         em->num_connected++;
1495                 }
1496         }
1497
1498         /* we shouldnt try to win this election if we cant be a recmaster */
1499         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1500                 em->num_connected = 0;
1501                 em->priority_time = timeval_current();
1502         }
1503
1504         talloc_free(nodemap);
1505 }
1506
1507 /*
1508   see if the given election data wins
1509  */
1510 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1511 {
1512         struct election_message myem;
1513         int cmp = 0;
1514
1515         ctdb_election_data(rec, &myem);
1516
1517         /* we cant win if we don't have the recmaster capability */
1518         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1519                 return false;
1520         }
1521
1522         /* we cant win if we are banned */
1523         if (rec->node_flags & NODE_FLAGS_BANNED) {
1524                 return false;
1525         }
1526
1527         /* we cant win if we are stopped */
1528         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1529                 return false;
1530         }
1531
1532         /* we will automatically win if the other node is banned */
1533         if (em->node_flags & NODE_FLAGS_BANNED) {
1534                 return true;
1535         }
1536
1537         /* we will automatically win if the other node is banned */
1538         if (em->node_flags & NODE_FLAGS_STOPPED) {
1539                 return true;
1540         }
1541
1542         /* then the longest running node */
1543         if (cmp == 0) {
1544                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1545         }
1546
1547         if (cmp == 0) {
1548                 cmp = (int)myem.pnn - (int)em->pnn;
1549         }
1550
1551         return cmp > 0;
1552 }
1553
1554 /*
1555   send out an election request
1556  */
1557 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1558 {
1559         int ret;
1560         TDB_DATA election_data;
1561         struct election_message emsg;
1562         uint64_t srvid;
1563         struct ctdb_context *ctdb = rec->ctdb;
1564
1565         srvid = CTDB_SRVID_ELECTION;
1566
1567         ctdb_election_data(rec, &emsg);
1568
1569         election_data.dsize = sizeof(struct election_message);
1570         election_data.dptr  = (unsigned char *)&emsg;
1571
1572
1573         /* first we assume we will win the election and set 
1574            recoverymaster to be ourself on the current node
1575          */
1576         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1577                                      CTDB_CURRENT_NODE, pnn);
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1580                 return -1;
1581         }
1582         rec->recmaster = pnn;
1583
1584         /* send an election message to all active nodes */
1585         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1586         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1587 }
1588
1589 /*
1590   we think we are winning the election - send a broadcast election request
1591  */
1592 static void election_send_request(struct tevent_context *ev,
1593                                   struct tevent_timer *te,
1594                                   struct timeval t, void *p)
1595 {
1596         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1597         int ret;
1598
1599         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1600         if (ret != 0) {
1601                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1602         }
1603
1604         TALLOC_FREE(rec->send_election_te);
1605 }
1606
1607 /*
1608   handler for memory dumps
1609 */
1610 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1611 {
1612         struct ctdb_recoverd *rec = talloc_get_type(
1613                 private_data, struct ctdb_recoverd);
1614         struct ctdb_context *ctdb = rec->ctdb;
1615         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1616         TDB_DATA *dump;
1617         int ret;
1618         struct ctdb_srvid_message *rd;
1619
1620         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1621                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1622                 talloc_free(tmp_ctx);
1623                 return;
1624         }
1625         rd = (struct ctdb_srvid_message *)data.dptr;
1626
1627         dump = talloc_zero(tmp_ctx, TDB_DATA);
1628         if (dump == NULL) {
1629                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1630                 talloc_free(tmp_ctx);
1631                 return;
1632         }
1633         ret = ctdb_dump_memory(ctdb, dump);
1634         if (ret != 0) {
1635                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1636                 talloc_free(tmp_ctx);
1637                 return;
1638         }
1639
1640 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1641
1642         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1643         if (ret != 0) {
1644                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1645                 talloc_free(tmp_ctx);
1646                 return;
1647         }
1648
1649         talloc_free(tmp_ctx);
1650 }
1651
1652 /*
1653   handler for reload_nodes
1654 */
1655 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1656                                  void *private_data)
1657 {
1658         struct ctdb_recoverd *rec = talloc_get_type(
1659                 private_data, struct ctdb_recoverd);
1660
1661         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1662
1663         ctdb_load_nodes_file(rec->ctdb);
1664 }
1665
1666
1667 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1668                                         void *private_data)
1669 {
1670         struct ctdb_recoverd *rec = talloc_get_type(
1671                 private_data, struct ctdb_recoverd);
1672         struct ctdb_context *ctdb = rec->ctdb;
1673         uint32_t pnn;
1674         uint32_t *t;
1675         int len;
1676
1677         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1678                 return;
1679         }
1680
1681         if (data.dsize != sizeof(uint32_t)) {
1682                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1683                 return;
1684         }
1685
1686         pnn = *(uint32_t *)&data.dptr[0];
1687
1688         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1689
1690         /* Copy any existing list of nodes.  There's probably some
1691          * sort of realloc variant that will do this but we need to
1692          * make sure that freeing the old array also cancels the timer
1693          * event for the timeout... not sure if realloc will do that.
1694          */
1695         len = (rec->force_rebalance_nodes != NULL) ?
1696                 talloc_array_length(rec->force_rebalance_nodes) :
1697                 0;
1698
1699         /* This allows duplicates to be added but they don't cause
1700          * harm.  A call to add a duplicate PNN arguably means that
1701          * the timeout should be reset, so this is the simplest
1702          * solution.
1703          */
1704         t = talloc_zero_array(rec, uint32_t, len+1);
1705         CTDB_NO_MEMORY_VOID(ctdb, t);
1706         if (len > 0) {
1707                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1708         }
1709         t[len] = pnn;
1710
1711         talloc_free(rec->force_rebalance_nodes);
1712
1713         rec->force_rebalance_nodes = t;
1714 }
1715
1716
1717
1718 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1719                                     TDB_DATA data,
1720                                     struct ctdb_op_state *op_state)
1721 {
1722         struct ctdb_disable_message *r;
1723         uint32_t timeout;
1724         TDB_DATA result;
1725         int32_t ret = 0;
1726
1727         /* Validate input data */
1728         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1729                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1730                                  "expecting %lu\n", (long unsigned)data.dsize,
1731                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1732                 return;
1733         }
1734         if (data.dptr == NULL) {
1735                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1736                 return;
1737         }
1738
1739         r = (struct ctdb_disable_message *)data.dptr;
1740         timeout = r->timeout;
1741
1742         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1743         if (ret != 0) {
1744                 goto done;
1745         }
1746
1747         /* Returning our PNN tells the caller that we succeeded */
1748         ret = ctdb_get_pnn(ctdb);
1749 done:
1750         result.dsize = sizeof(int32_t);
1751         result.dptr  = (uint8_t *)&ret;
1752         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1753 }
1754
1755 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1756                                           void *private_data)
1757 {
1758         struct ctdb_recoverd *rec = talloc_get_type(
1759                 private_data, struct ctdb_recoverd);
1760
1761         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1762 }
1763
1764 /* Backward compatibility for this SRVID */
1765 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1766                                      void *private_data)
1767 {
1768         struct ctdb_recoverd *rec = talloc_get_type(
1769                 private_data, struct ctdb_recoverd);
1770         uint32_t timeout;
1771
1772         if (data.dsize != sizeof(uint32_t)) {
1773                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1774                                  "expecting %lu\n", (long unsigned)data.dsize,
1775                                  (long unsigned)sizeof(uint32_t)));
1776                 return;
1777         }
1778         if (data.dptr == NULL) {
1779                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1780                 return;
1781         }
1782
1783         timeout = *((uint32_t *)data.dptr);
1784
1785         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1786 }
1787
1788 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1789                                        void *private_data)
1790 {
1791         struct ctdb_recoverd *rec = talloc_get_type(
1792                 private_data, struct ctdb_recoverd);
1793
1794         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1795 }
1796
1797 /*
1798   handler for ip reallocate, just add it to the list of requests and 
1799   handle this later in the monitor_cluster loop so we do not recurse
1800   with other requests to takeover_run()
1801 */
1802 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1803                                   void *private_data)
1804 {
1805         struct ctdb_srvid_message *request;
1806         struct ctdb_recoverd *rec = talloc_get_type(
1807                 private_data, struct ctdb_recoverd);
1808
1809         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1810                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1811                 return;
1812         }
1813
1814         request = (struct ctdb_srvid_message *)data.dptr;
1815
1816         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1817 }
1818
1819 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1820                                           struct ctdb_recoverd *rec)
1821 {
1822         TDB_DATA result;
1823         int32_t ret;
1824         struct srvid_requests *current;
1825
1826         /* Only process requests that are currently pending.  More
1827          * might come in while the takeover run is in progress and
1828          * they will need to be processed later since they might
1829          * be in response flag changes.
1830          */
1831         current = rec->reallocate_requests;
1832         rec->reallocate_requests = NULL;
1833
1834         if (do_takeover_run(rec, rec->nodemap)) {
1835                 ret = ctdb_get_pnn(ctdb);
1836         } else {
1837                 ret = -1;
1838         }
1839
1840         result.dsize = sizeof(int32_t);
1841         result.dptr  = (uint8_t *)&ret;
1842
1843         srvid_requests_reply(ctdb, &current, result);
1844 }
1845
1846 /*
1847  * handler for assigning banning credits
1848  */
1849 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1850 {
1851         struct ctdb_recoverd *rec = talloc_get_type(
1852                 private_data, struct ctdb_recoverd);
1853         uint32_t ban_pnn;
1854
1855         /* Ignore if we are not recmaster */
1856         if (rec->ctdb->pnn != rec->recmaster) {
1857                 return;
1858         }
1859
1860         if (data.dsize != sizeof(uint32_t)) {
1861                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1862                                   data.dsize));
1863                 return;
1864         }
1865
1866         ban_pnn = *(uint32_t *)data.dptr;
1867
1868         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1869 }
1870
1871 /*
1872   handler for recovery master elections
1873 */
1874 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1875 {
1876         struct ctdb_recoverd *rec = talloc_get_type(
1877                 private_data, struct ctdb_recoverd);
1878         struct ctdb_context *ctdb = rec->ctdb;
1879         int ret;
1880         struct election_message *em = (struct election_message *)data.dptr;
1881
1882         /* Ignore election packets from ourself */
1883         if (ctdb->pnn == em->pnn) {
1884                 return;
1885         }
1886
1887         /* we got an election packet - update the timeout for the election */
1888         talloc_free(rec->election_timeout);
1889         rec->election_timeout = tevent_add_timer(
1890                         ctdb->ev, ctdb,
1891                         fast_start ?
1892                                 timeval_current_ofs(0, 500000) :
1893                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1894                         ctdb_election_timeout, rec);
1895
1896         /* someone called an election. check their election data
1897            and if we disagree and we would rather be the elected node, 
1898            send a new election message to all other nodes
1899          */
1900         if (ctdb_election_win(rec, em)) {
1901                 if (!rec->send_election_te) {
1902                         rec->send_election_te = tevent_add_timer(
1903                                         ctdb->ev, rec,
1904                                         timeval_current_ofs(0, 500000),
1905                                         election_send_request, rec);
1906                 }
1907                 return;
1908         }
1909
1910         /* we didn't win */
1911         TALLOC_FREE(rec->send_election_te);
1912
1913         /* Release the recovery lock file */
1914         if (ctdb_recovery_have_lock(rec)) {
1915                 ctdb_recovery_unlock(rec);
1916         }
1917
1918         /* ok, let that guy become recmaster then */
1919         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1920                                      CTDB_CURRENT_NODE, em->pnn);
1921         if (ret != 0) {
1922                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1923                 return;
1924         }
1925         rec->recmaster = em->pnn;
1926
1927         return;
1928 }
1929
1930
1931 /*
1932   force the start of the election process
1933  */
1934 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1935                            struct ctdb_node_map_old *nodemap)
1936 {
1937         int ret;
1938         struct ctdb_context *ctdb = rec->ctdb;
1939
1940         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1941
1942         /* set all nodes to recovery mode to stop all internode traffic */
1943         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1944         if (ret != 0) {
1945                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1946                 return;
1947         }
1948
1949         talloc_free(rec->election_timeout);
1950         rec->election_timeout = tevent_add_timer(
1951                         ctdb->ev, ctdb,
1952                         fast_start ?
1953                                 timeval_current_ofs(0, 500000) :
1954                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1955                         ctdb_election_timeout, rec);
1956
1957         ret = send_election_request(rec, pnn);
1958         if (ret!=0) {
1959                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1960                 return;
1961         }
1962
1963         /* wait for a few seconds to collect all responses */
1964         ctdb_wait_election(rec);
1965 }
1966
1967
1968
1969 /*
1970   handler for when a node changes its flags
1971 */
1972 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1973 {
1974         struct ctdb_recoverd *rec = talloc_get_type(
1975                 private_data, struct ctdb_recoverd);
1976         struct ctdb_context *ctdb = rec->ctdb;
1977         int ret;
1978         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1979         struct ctdb_node_map_old *nodemap=NULL;
1980         TALLOC_CTX *tmp_ctx;
1981         int i;
1982
1983         if (data.dsize != sizeof(*c)) {
1984                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1985                 return;
1986         }
1987
1988         tmp_ctx = talloc_new(ctdb);
1989         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1990
1991         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1992         if (ret != 0) {
1993                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1994                 talloc_free(tmp_ctx);
1995                 return;         
1996         }
1997
1998
1999         for (i=0;i<nodemap->num;i++) {
2000                 if (nodemap->nodes[i].pnn == c->pnn) break;
2001         }
2002
2003         if (i == nodemap->num) {
2004                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2005                 talloc_free(tmp_ctx);
2006                 return;
2007         }
2008
2009         if (c->old_flags != c->new_flags) {
2010                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2011         }
2012
2013         nodemap->nodes[i].flags = c->new_flags;
2014
2015         talloc_free(tmp_ctx);
2016 }
2017
2018 /*
2019   handler for when we need to push out flag changes ot all other nodes
2020 */
2021 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2022                                void *private_data)
2023 {
2024         struct ctdb_recoverd *rec = talloc_get_type(
2025                 private_data, struct ctdb_recoverd);
2026         struct ctdb_context *ctdb = rec->ctdb;
2027         int ret;
2028         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2029         struct ctdb_node_map_old *nodemap=NULL;
2030         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2031         uint32_t *nodes;
2032
2033         /* read the node flags from the recmaster */
2034         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2035                                    tmp_ctx, &nodemap);
2036         if (ret != 0) {
2037                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2038                 talloc_free(tmp_ctx);
2039                 return;
2040         }
2041         if (c->pnn >= nodemap->num) {
2042                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2043                 talloc_free(tmp_ctx);
2044                 return;
2045         }
2046
2047         /* send the flags update to all connected nodes */
2048         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2049
2050         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2051                                       nodes, 0, CONTROL_TIMEOUT(),
2052                                       false, data,
2053                                       NULL, NULL,
2054                                       NULL) != 0) {
2055                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2056
2057                 talloc_free(tmp_ctx);
2058                 return;
2059         }
2060
2061         talloc_free(tmp_ctx);
2062 }
2063
2064
2065 struct verify_recmode_normal_data {
2066         uint32_t count;
2067         enum monitor_result status;
2068 };
2069
2070 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2071 {
2072         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2073
2074
2075         /* one more node has responded with recmode data*/
2076         rmdata->count--;
2077
2078         /* if we failed to get the recmode, then return an error and let
2079            the main loop try again.
2080         */
2081         if (state->state != CTDB_CONTROL_DONE) {
2082                 if (rmdata->status == MONITOR_OK) {
2083                         rmdata->status = MONITOR_FAILED;
2084                 }
2085                 return;
2086         }
2087
2088         /* if we got a response, then the recmode will be stored in the
2089            status field
2090         */
2091         if (state->status != CTDB_RECOVERY_NORMAL) {
2092                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2093                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2094         }
2095
2096         return;
2097 }
2098
2099
2100 /* verify that all nodes are in normal recovery mode */
2101 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2102 {
2103         struct verify_recmode_normal_data *rmdata;
2104         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2105         struct ctdb_client_control_state *state;
2106         enum monitor_result status;
2107         int j;
2108         
2109         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2110         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2111         rmdata->count  = 0;
2112         rmdata->status = MONITOR_OK;
2113
2114         /* loop over all active nodes and send an async getrecmode call to 
2115            them*/
2116         for (j=0; j<nodemap->num; j++) {
2117                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2118                         continue;
2119                 }
2120                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2121                                         CONTROL_TIMEOUT(), 
2122                                         nodemap->nodes[j].pnn);
2123                 if (state == NULL) {
2124                         /* we failed to send the control, treat this as 
2125                            an error and try again next iteration
2126                         */                      
2127                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2128                         talloc_free(mem_ctx);
2129                         return MONITOR_FAILED;
2130                 }
2131
2132                 /* set up the callback functions */
2133                 state->async.fn = verify_recmode_normal_callback;
2134                 state->async.private_data = rmdata;
2135
2136                 /* one more control to wait for to complete */
2137                 rmdata->count++;
2138         }
2139
2140
2141         /* now wait for up to the maximum number of seconds allowed
2142            or until all nodes we expect a response from has replied
2143         */
2144         while (rmdata->count > 0) {
2145                 tevent_loop_once(ctdb->ev);
2146         }
2147
2148         status = rmdata->status;
2149         talloc_free(mem_ctx);
2150         return status;
2151 }
2152
2153
2154 struct verify_recmaster_data {
2155         struct ctdb_recoverd *rec;
2156         uint32_t count;
2157         uint32_t pnn;
2158         enum monitor_result status;
2159 };
2160
2161 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2162 {
2163         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2164
2165
2166         /* one more node has responded with recmaster data*/
2167         rmdata->count--;
2168
2169         /* if we failed to get the recmaster, then return an error and let
2170            the main loop try again.
2171         */
2172         if (state->state != CTDB_CONTROL_DONE) {
2173                 if (rmdata->status == MONITOR_OK) {
2174                         rmdata->status = MONITOR_FAILED;
2175                 }
2176                 return;
2177         }
2178
2179         /* if we got a response, then the recmaster will be stored in the
2180            status field
2181         */
2182         if (state->status != rmdata->pnn) {
2183                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2184                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2185                 rmdata->status = MONITOR_ELECTION_NEEDED;
2186         }
2187
2188         return;
2189 }
2190
2191
2192 /* verify that all nodes agree that we are the recmaster */
2193 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
2194 {
2195         struct ctdb_context *ctdb = rec->ctdb;
2196         struct verify_recmaster_data *rmdata;
2197         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2198         struct ctdb_client_control_state *state;
2199         enum monitor_result status;
2200         int j;
2201         
2202         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2203         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2204         rmdata->rec    = rec;
2205         rmdata->count  = 0;
2206         rmdata->pnn    = pnn;
2207         rmdata->status = MONITOR_OK;
2208
2209         /* loop over all active nodes and send an async getrecmaster call to
2210            them*/
2211         for (j=0; j<nodemap->num; j++) {
2212                 if (nodemap->nodes[j].pnn == rec->recmaster) {
2213                         continue;
2214                 }
2215                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2216                         continue;
2217                 }
2218                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2219                                         CONTROL_TIMEOUT(),
2220                                         nodemap->nodes[j].pnn);
2221                 if (state == NULL) {
2222                         /* we failed to send the control, treat this as 
2223                            an error and try again next iteration
2224                         */                      
2225                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2226                         talloc_free(mem_ctx);
2227                         return MONITOR_FAILED;
2228                 }
2229
2230                 /* set up the callback functions */
2231                 state->async.fn = verify_recmaster_callback;
2232                 state->async.private_data = rmdata;
2233
2234                 /* one more control to wait for to complete */
2235                 rmdata->count++;
2236         }
2237
2238
2239         /* now wait for up to the maximum number of seconds allowed
2240            or until all nodes we expect a response from has replied
2241         */
2242         while (rmdata->count > 0) {
2243                 tevent_loop_once(ctdb->ev);
2244         }
2245
2246         status = rmdata->status;
2247         talloc_free(mem_ctx);
2248         return status;
2249 }
2250
2251 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2252                                     struct ctdb_recoverd *rec)
2253 {
2254         struct ctdb_iface_list_old *ifaces = NULL;
2255         TALLOC_CTX *mem_ctx;
2256         bool ret = false;
2257
2258         mem_ctx = talloc_new(NULL);
2259
2260         /* Read the interfaces from the local node */
2261         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2262                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2263                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2264                 /* We could return an error.  However, this will be
2265                  * rare so we'll decide that the interfaces have
2266                  * actually changed, just in case.
2267                  */
2268                 talloc_free(mem_ctx);
2269                 return true;
2270         }
2271
2272         if (!rec->ifaces) {
2273                 /* We haven't been here before so things have changed */
2274                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2275                 ret = true;
2276         } else if (rec->ifaces->num != ifaces->num) {
2277                 /* Number of interfaces has changed */
2278                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2279                                      rec->ifaces->num, ifaces->num));
2280                 ret = true;
2281         } else {
2282                 /* See if interface names or link states have changed */
2283                 int i;
2284                 for (i = 0; i < rec->ifaces->num; i++) {
2285                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2286                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2287                                 DEBUG(DEBUG_NOTICE,
2288                                       ("Interface in slot %d changed: %s => %s\n",
2289                                        i, iface->name, ifaces->ifaces[i].name));
2290                                 ret = true;
2291                                 break;
2292                         }
2293                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2294                                 DEBUG(DEBUG_NOTICE,
2295                                       ("Interface %s changed state: %d => %d\n",
2296                                        iface->name, iface->link_state,
2297                                        ifaces->ifaces[i].link_state));
2298                                 ret = true;
2299                                 break;
2300                         }
2301                 }
2302         }
2303
2304         talloc_free(rec->ifaces);
2305         rec->ifaces = talloc_steal(rec, ifaces);
2306
2307         talloc_free(mem_ctx);
2308         return ret;
2309 }
2310
2311 /* Check that the local allocation of public IP addresses is correct
2312  * and do some house-keeping */
2313 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2314                                       struct ctdb_recoverd *rec,
2315                                       uint32_t pnn,
2316                                       struct ctdb_node_map_old *nodemap)
2317 {
2318         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2319         int ret, j;
2320         bool need_takeover_run = false;
2321         struct ctdb_public_ip_list_old *ips = NULL;
2322
2323         /* If we are not the recmaster then do some housekeeping */
2324         if (rec->recmaster != pnn) {
2325                 /* Ignore any IP reallocate requests - only recmaster
2326                  * processes them
2327                  */
2328                 TALLOC_FREE(rec->reallocate_requests);
2329                 /* Clear any nodes that should be force rebalanced in
2330                  * the next takeover run.  If the recovery master role
2331                  * has moved then we don't want to process these some
2332                  * time in the future.
2333                  */
2334                 TALLOC_FREE(rec->force_rebalance_nodes);
2335         }
2336
2337         /* Return early if disabled... */
2338         if (ctdb_config.failover_disabled ||
2339             ctdb_op_is_disabled(rec->takeover_run)) {
2340                 return  0;
2341         }
2342
2343         if (interfaces_have_changed(ctdb, rec)) {
2344                 need_takeover_run = true;
2345         }
2346
2347         /* If there are unhosted IPs but this node can host them then
2348          * trigger an IP reallocation */
2349
2350         /* Read *available* IPs from local node */
2351         ret = ctdb_ctrl_get_public_ips_flags(
2352                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2353                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2354         if (ret != 0) {
2355                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2356                 talloc_free(mem_ctx);
2357                 return -1;
2358         }
2359
2360         for (j=0; j<ips->num; j++) {
2361                 if (ips->ips[j].pnn == -1 &&
2362                     nodemap->nodes[pnn].flags == 0) {
2363                         DEBUG(DEBUG_WARNING,
2364                               ("Unassigned IP %s can be served by this node\n",
2365                                ctdb_addr_to_str(&ips->ips[j].addr)));
2366                         need_takeover_run = true;
2367                 }
2368         }
2369
2370         talloc_free(ips);
2371
2372         if (!ctdb->do_checkpublicip) {
2373                 goto done;
2374         }
2375
2376         /* Validate the IP addresses that this node has on network
2377          * interfaces.  If there is an inconsistency between reality
2378          * and the state expected by CTDB then try to fix it by
2379          * triggering an IP reallocation or releasing extraneous IP
2380          * addresses. */
2381
2382         /* Read *known* IPs from local node */
2383         ret = ctdb_ctrl_get_public_ips_flags(
2384                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2385         if (ret != 0) {
2386                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2387                 talloc_free(mem_ctx);
2388                 return -1;
2389         }
2390
2391         for (j=0; j<ips->num; j++) {
2392                 if (ips->ips[j].pnn == pnn) {
2393                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2394                                 DEBUG(DEBUG_ERR,
2395                                       ("Assigned IP %s not on an interface\n",
2396                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2397                                 need_takeover_run = true;
2398                         }
2399                 } else {
2400                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2401                                 DEBUG(DEBUG_ERR,
2402                                       ("IP %s incorrectly on an interface\n",
2403                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2404                                 need_takeover_run = true;
2405                         }
2406                 }
2407         }
2408
2409 done:
2410         if (need_takeover_run) {
2411                 struct ctdb_srvid_message rd;
2412                 TDB_DATA data;
2413
2414                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2415
2416                 ZERO_STRUCT(rd);
2417                 rd.pnn = ctdb->pnn;
2418                 rd.srvid = 0;
2419                 data.dptr = (uint8_t *)&rd;
2420                 data.dsize = sizeof(rd);
2421
2422                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2423                 if (ret != 0) {
2424                         DEBUG(DEBUG_ERR,
2425                               ("Failed to send takeover run request\n"));
2426                 }
2427         }
2428         talloc_free(mem_ctx);
2429         return 0;
2430 }
2431
2432
2433 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2434 {
2435         struct ctdb_node_map_old **remote_nodemaps = callback_data;
2436
2437         if (node_pnn >= ctdb->num_nodes) {
2438                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2439                 return;
2440         }
2441
2442         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
2443
2444 }
2445
2446 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2447         struct ctdb_node_map_old *nodemap,
2448         struct ctdb_node_map_old **remote_nodemaps)
2449 {
2450         uint32_t *nodes;
2451
2452         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2453         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2454                                         nodes, 0,
2455                                         CONTROL_TIMEOUT(), false, tdb_null,
2456                                         async_getnodemap_callback,
2457                                         NULL,
2458                                         remote_nodemaps) != 0) {
2459                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2460
2461                 return -1;
2462         }
2463
2464         return 0;
2465 }
2466
2467 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2468                                      TALLOC_CTX *mem_ctx)
2469 {
2470         struct ctdb_context *ctdb = rec->ctdb;
2471         uint32_t pnn = ctdb_get_pnn(ctdb);
2472         struct ctdb_node_map_old *nodemap = rec->nodemap;
2473         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2474         int ret;
2475
2476         /* When recovery daemon is started, recmaster is set to
2477          * "unknown" so it knows to start an election.
2478          */
2479         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2480                 DEBUG(DEBUG_NOTICE,
2481                       ("Initial recovery master set - forcing election\n"));
2482                 force_election(rec, pnn, nodemap);
2483                 return false;
2484         }
2485
2486         /*
2487          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2488          * but we have, then force an election and try to become the new
2489          * recmaster.
2490          */
2491         if (!ctdb_node_has_capabilities(rec->caps,
2492                                         rec->recmaster,
2493                                         CTDB_CAP_RECMASTER) &&
2494             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2495             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2496                 DEBUG(DEBUG_ERR,
2497                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2498                        " but we (node %u) have - force an election\n",
2499                        rec->recmaster, pnn));
2500                 force_election(rec, pnn, nodemap);
2501                 return false;
2502         }
2503
2504         /* Verify that the master node has not been deleted.  This
2505          * should not happen because a node should always be shutdown
2506          * before being deleted, causing a new master to be elected
2507          * before now.  However, if something strange has happened
2508          * then checking here will ensure we don't index beyond the
2509          * end of the nodemap array. */
2510         if (rec->recmaster >= nodemap->num) {
2511                 DEBUG(DEBUG_ERR,
2512                       ("Recmaster node %u has been deleted. Force election\n",
2513                        rec->recmaster));
2514                 force_election(rec, pnn, nodemap);
2515                 return false;
2516         }
2517
2518         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2519         if (nodemap->nodes[rec->recmaster].flags &
2520             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2521                 DEBUG(DEBUG_NOTICE,
2522                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2523                        rec->recmaster));
2524                 force_election(rec, pnn, nodemap);
2525                 return false;
2526         }
2527
2528         /* get nodemap from the recovery master to check if it is inactive */
2529         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2530                                    mem_ctx, &recmaster_nodemap);
2531         if (ret != 0) {
2532                 DEBUG(DEBUG_ERR,
2533                       (__location__
2534                        " Unable to get nodemap from recovery master %u\n",
2535                           rec->recmaster));
2536                 /* No election, just error */
2537                 return false;
2538         }
2539
2540
2541         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2542             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2543                 DEBUG(DEBUG_NOTICE,
2544                       ("Recmaster node %u is inactive. Force election\n",
2545                        rec->recmaster));
2546                 /*
2547                  * update our nodemap to carry the recmaster's notion of
2548                  * its own flags, so that we don't keep freezing the
2549                  * inactive recmaster node...
2550                  */
2551                 nodemap->nodes[rec->recmaster].flags =
2552                         recmaster_nodemap->nodes[rec->recmaster].flags;
2553                 force_election(rec, pnn, nodemap);
2554                 return false;
2555         }
2556
2557         return true;
2558 }
2559
2560 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2561                       TALLOC_CTX *mem_ctx)
2562 {
2563         uint32_t pnn;
2564         struct ctdb_node_map_old *nodemap=NULL;
2565         struct ctdb_node_map_old **remote_nodemaps=NULL;
2566         struct ctdb_vnn_map *vnnmap=NULL;
2567         struct ctdb_vnn_map *remote_vnnmap=NULL;
2568         uint32_t num_lmasters;
2569         int32_t debug_level;
2570         int i, j, ret;
2571         bool self_ban;
2572
2573
2574         /* verify that the main daemon is still running */
2575         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2576                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2577                 exit(-1);
2578         }
2579
2580         /* ping the local daemon to tell it we are alive */
2581         ctdb_ctrl_recd_ping(ctdb);
2582
2583         if (rec->election_timeout) {
2584                 /* an election is in progress */
2585                 return;
2586         }
2587
2588         /* read the debug level from the parent and update locally */
2589         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2590         if (ret !=0) {
2591                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2592                 return;
2593         }
2594         DEBUGLEVEL = debug_level;
2595
2596         /* get relevant tunables */
2597         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2598         if (ret != 0) {
2599                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2600                 return;
2601         }
2602
2603         /* get runstate */
2604         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2605                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2606         if (ret != 0) {
2607                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2608                 return;
2609         }
2610
2611         pnn = ctdb_get_pnn(ctdb);
2612
2613         /* get nodemap */
2614         TALLOC_FREE(rec->nodemap);
2615         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2616         if (ret != 0) {
2617                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2618                 return;
2619         }
2620         nodemap = rec->nodemap;
2621
2622         /* remember our own node flags */
2623         rec->node_flags = nodemap->nodes[pnn].flags;
2624
2625         ban_misbehaving_nodes(rec, &self_ban);
2626         if (self_ban) {
2627                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2628                 return;
2629         }
2630
2631         ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2632                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2633         if (ret != 0) {
2634                 D_ERR("Failed to read recmode from local node\n");
2635                 return;
2636         }
2637
2638         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2639            also frozen and that the recmode is set to active.
2640         */
2641         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2642                 /* If this node has become inactive then we want to
2643                  * reduce the chances of it taking over the recovery
2644                  * master role when it becomes active again.  This
2645                  * helps to stabilise the recovery master role so that
2646                  * it stays on the most stable node.
2647                  */
2648                 rec->priority_time = timeval_current();
2649
2650                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2651                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2652
2653                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2654                         if (ret != 0) {
2655                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2656
2657                                 return;
2658                         }
2659                 }
2660                 if (! rec->frozen_on_inactive) {
2661                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2662                                                CTDB_CURRENT_NODE);
2663                         if (ret != 0) {
2664                                 DEBUG(DEBUG_ERR,
2665                                       (__location__ " Failed to freeze node "
2666                                        "in STOPPED or BANNED state\n"));
2667                                 return;
2668                         }
2669
2670                         rec->frozen_on_inactive = true;
2671                 }
2672
2673                 /* If this node is stopped or banned then it is not the recovery
2674                  * master, so don't do anything. This prevents stopped or banned
2675                  * node from starting election and sending unnecessary controls.
2676                  */
2677                 return;
2678         }
2679
2680         rec->frozen_on_inactive = false;
2681
2682         /* Retrieve capabilities from all connected nodes */
2683         ret = update_capabilities(rec, nodemap);
2684         if (ret != 0) {
2685                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2686                 return;
2687         }
2688
2689         if (! validate_recovery_master(rec, mem_ctx)) {
2690                 return;
2691         }
2692
2693         if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2694                 /* Check if an IP takeover run is needed and trigger one if
2695                  * necessary */
2696                 verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2697         }
2698
2699         /* if we are not the recmaster then we do not need to check
2700            if recovery is needed
2701          */
2702         if (pnn != rec->recmaster) {
2703                 return;
2704         }
2705
2706
2707         /* ensure our local copies of flags are right */
2708         ret = update_local_flags(rec, nodemap);
2709         if (ret != 0) {
2710                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2711                 return;
2712         }
2713
2714         if (ctdb->num_nodes != nodemap->num) {
2715                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2716                 ctdb_load_nodes_file(ctdb);
2717                 return;
2718         }
2719
2720         /* verify that all active nodes agree that we are the recmaster */
2721         switch (verify_recmaster(rec, nodemap, pnn)) {
2722         case MONITOR_RECOVERY_NEEDED:
2723                 /* can not happen */
2724                 return;
2725         case MONITOR_ELECTION_NEEDED:
2726                 force_election(rec, pnn, nodemap);
2727                 return;
2728         case MONITOR_OK:
2729                 break;
2730         case MONITOR_FAILED:
2731                 return;
2732         }
2733
2734
2735         /* get the vnnmap */
2736         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2737         if (ret != 0) {
2738                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2739                 return;
2740         }
2741
2742         if (rec->need_recovery) {
2743                 /* a previous recovery didn't finish */
2744                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2745                 return;
2746         }
2747
2748         /* verify that all active nodes are in normal mode 
2749            and not in recovery mode 
2750         */
2751         switch (verify_recmode(ctdb, nodemap)) {
2752         case MONITOR_RECOVERY_NEEDED:
2753                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2754                 return;
2755         case MONITOR_FAILED:
2756                 return;
2757         case MONITOR_ELECTION_NEEDED:
2758                 /* can not happen */
2759         case MONITOR_OK:
2760                 break;
2761         }
2762
2763
2764         if (ctdb->recovery_lock != NULL) {
2765                 /* We must already hold the recovery lock */
2766                 if (!ctdb_recovery_have_lock(rec)) {
2767                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2768                         ctdb_set_culprit(rec, ctdb->pnn);
2769                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2770                         return;
2771                 }
2772         }
2773
2774
2775         /* If recoveries are disabled then there is no use doing any
2776          * nodemap or flags checks.  Recoveries might be disabled due
2777          * to "reloadnodes", so doing these checks might cause an
2778          * unnecessary recovery.  */
2779         if (ctdb_op_is_disabled(rec->recovery)) {
2780                 goto takeover_run_checks;
2781         }
2782
2783         /* get the nodemap for all active remote nodes
2784          */
2785         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
2786         if (remote_nodemaps == NULL) {
2787                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2788                 return;
2789         }
2790         for(i=0; i<nodemap->num; i++) {
2791                 remote_nodemaps[i] = NULL;
2792         }
2793         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2794                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2795                 return;
2796         } 
2797
2798         /* verify that all other nodes have the same nodemap as we have
2799         */
2800         for (j=0; j<nodemap->num; j++) {
2801                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2802                         continue;
2803                 }
2804
2805                 if (remote_nodemaps[j] == NULL) {
2806                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2807                         ctdb_set_culprit(rec, j);
2808
2809                         return;
2810                 }
2811
2812                 /* if the nodes disagree on how many nodes there are
2813                    then this is a good reason to try recovery
2814                  */
2815                 if (remote_nodemaps[j]->num != nodemap->num) {
2816                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2817                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2818                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2819                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2820                         return;
2821                 }
2822
2823                 /* if the nodes disagree on which nodes exist and are
2824                    active, then that is also a good reason to do recovery
2825                  */
2826                 for (i=0;i<nodemap->num;i++) {
2827                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2828                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2829                                           nodemap->nodes[j].pnn, i, 
2830                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2831                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2832                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2833                                             vnnmap);
2834                                 return;
2835                         }
2836                 }
2837         }
2838
2839         /*
2840          * Update node flags obtained from each active node. This ensure we have
2841          * up-to-date information for all the nodes.
2842          */
2843         for (j=0; j<nodemap->num; j++) {
2844                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2845                         continue;
2846                 }
2847                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2848         }
2849
2850         for (j=0; j<nodemap->num; j++) {
2851                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2852                         continue;
2853                 }
2854
2855                 /* verify the flags are consistent
2856                 */
2857                 for (i=0; i<nodemap->num; i++) {
2858                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2859                                 continue;
2860                         }
2861                         
2862                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2863                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2864                                   nodemap->nodes[j].pnn, 
2865                                   nodemap->nodes[i].pnn, 
2866                                   remote_nodemaps[j]->nodes[i].flags,
2867                                   nodemap->nodes[i].flags));
2868                                 if (i == j) {
2869                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2870                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2871                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2872                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2873                                                     vnnmap);
2874                                         return;
2875                                 } else {
2876                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2877                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2878                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2879                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2880                                                     vnnmap);
2881                                         return;
2882                                 }
2883                         }
2884                 }
2885         }
2886
2887
2888         /* count how many active nodes there are */
2889         num_lmasters  = 0;
2890         for (i=0; i<nodemap->num; i++) {
2891                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2892                         if (ctdb_node_has_capabilities(rec->caps,
2893                                                        ctdb->nodes[i]->pnn,
2894                                                        CTDB_CAP_LMASTER)) {
2895                                 num_lmasters++;
2896                         }
2897                 }
2898         }
2899
2900
2901         /* There must be the same number of lmasters in the vnn map as
2902          * there are active nodes with the lmaster capability...  or
2903          * do a recovery.
2904          */
2905         if (vnnmap->size != num_lmasters) {
2906                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2907                           vnnmap->size, num_lmasters));
2908                 ctdb_set_culprit(rec, ctdb->pnn);
2909                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2910                 return;
2911         }
2912
2913         /* verify that all active nodes in the nodemap also exist in 
2914            the vnnmap.
2915          */
2916         for (j=0; j<nodemap->num; j++) {
2917                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2918                         continue;
2919                 }
2920                 if (nodemap->nodes[j].pnn == pnn) {
2921                         continue;
2922                 }
2923
2924                 for (i=0; i<vnnmap->size; i++) {
2925                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2926                                 break;
2927                         }
2928                 }
2929                 if (i == vnnmap->size) {
2930                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2931                                   nodemap->nodes[j].pnn));
2932                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2933                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2934                         return;
2935                 }
2936         }
2937
2938         
2939         /* verify that all other nodes have the same vnnmap
2940            and are from the same generation
2941          */
2942         for (j=0; j<nodemap->num; j++) {
2943                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2944                         continue;
2945                 }
2946                 if (nodemap->nodes[j].pnn == pnn) {
2947                         continue;
2948                 }
2949
2950                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2951                                           mem_ctx, &remote_vnnmap);
2952                 if (ret != 0) {
2953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2954                                   nodemap->nodes[j].pnn));
2955                         return;
2956                 }
2957
2958                 /* verify the vnnmap generation is the same */
2959                 if (vnnmap->generation != remote_vnnmap->generation) {
2960                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2961                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2962                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2963                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2964                         return;
2965                 }
2966
2967                 /* verify the vnnmap size is the same */
2968                 if (vnnmap->size != remote_vnnmap->size) {
2969                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2970                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2971                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2972                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2973                         return;
2974                 }
2975
2976                 /* verify the vnnmap is the same */
2977                 for (i=0;i<vnnmap->size;i++) {
2978                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2979                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2980                                           nodemap->nodes[j].pnn));
2981                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2982                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2983                                             vnnmap);
2984                                 return;
2985                         }
2986                 }
2987         }
2988
2989         /* FIXME: Add remote public IP checking to ensure that nodes
2990          * have the IP addresses that are allocated to them. */
2991
2992 takeover_run_checks:
2993
2994         /* If there are IP takeover runs requested or the previous one
2995          * failed then perform one and notify the waiters */
2996         if (!ctdb_op_is_disabled(rec->takeover_run) &&
2997             (rec->reallocate_requests || rec->need_takeover_run)) {
2998                 process_ipreallocate_requests(ctdb, rec);
2999         }
3000 }
3001
3002 static void recd_sig_term_handler(struct tevent_context *ev,
3003                                   struct tevent_signal *se, int signum,
3004                                   int count, void *dont_care,
3005                                   void *private_data)
3006 {
3007         struct ctdb_recoverd *rec = talloc_get_type_abort(
3008                 private_data, struct ctdb_recoverd);
3009
3010         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
3011         ctdb_recovery_unlock(rec);
3012         exit(0);
3013 }
3014
3015
3016 /*
3017   the main monitoring loop
3018  */
3019 static void monitor_cluster(struct ctdb_context *ctdb)
3020 {
3021         struct tevent_signal *se;
3022         struct ctdb_recoverd *rec;
3023
3024         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3025
3026         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3027         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3028
3029         rec->ctdb = ctdb;
3030         rec->recmaster = CTDB_UNKNOWN_PNN;
3031         rec->recovery_lock_handle = NULL;
3032
3033         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3034         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3035
3036         rec->recovery = ctdb_op_init(rec, "recoveries");
3037         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3038
3039         rec->priority_time = timeval_current();
3040         rec->frozen_on_inactive = false;
3041
3042         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
3043                                recd_sig_term_handler, rec);
3044         if (se == NULL) {
3045                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
3046                 exit(1);
3047         }
3048
3049         /* register a message port for sending memory dumps */
3050         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3051
3052         /* when a node is assigned banning credits */
3053         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3054                                         banning_handler, rec);
3055
3056         /* register a message port for recovery elections */
3057         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3058
3059         /* when nodes are disabled/enabled */
3060         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3061
3062         /* when we are asked to puch out a flag change */
3063         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3064
3065         /* register a message port for vacuum fetch */
3066         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3067
3068         /* register a message port for reloadnodes  */
3069         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3070
3071         /* register a message port for performing a takeover run */
3072         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3073
3074         /* register a message port for disabling the ip check for a short while */
3075         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3076
3077         /* register a message port for forcing a rebalance of a node next
3078            reallocation */
3079         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3080
3081         /* Register a message port for disabling takeover runs */
3082         ctdb_client_set_message_handler(ctdb,
3083                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3084                                         disable_takeover_runs_handler, rec);
3085
3086         /* Register a message port for disabling recoveries */
3087         ctdb_client_set_message_handler(ctdb,
3088                                         CTDB_SRVID_DISABLE_RECOVERIES,
3089                                         disable_recoveries_handler, rec);
3090
3091         /* register a message port for detaching database */
3092         ctdb_client_set_message_handler(ctdb,
3093                                         CTDB_SRVID_DETACH_DATABASE,
3094                                         detach_database_handler, rec);
3095
3096         for (;;) {
3097                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3098                 struct timeval start;
3099                 double elapsed;
3100
3101                 if (!mem_ctx) {
3102                         DEBUG(DEBUG_CRIT,(__location__
3103                                           " Failed to create temp context\n"));
3104                         exit(-1);
3105                 }
3106
3107                 start = timeval_current();
3108                 main_loop(ctdb, rec, mem_ctx);
3109                 talloc_free(mem_ctx);
3110
3111                 /* we only check for recovery once every second */
3112                 elapsed = timeval_elapsed(&start);
3113                 if (elapsed < ctdb->tunable.recover_interval) {
3114                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3115                                           - elapsed);
3116                 }
3117         }
3118 }
3119
3120 /*
3121   event handler for when the main ctdbd dies
3122  */
3123 static void ctdb_recoverd_parent(struct tevent_context *ev,
3124                                  struct tevent_fd *fde,
3125                                  uint16_t flags, void *private_data)
3126 {
3127         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3128         _exit(1);
3129 }
3130
3131 /*
3132   called regularly to verify that the recovery daemon is still running
3133  */
3134 static void ctdb_check_recd(struct tevent_context *ev,
3135                             struct tevent_timer *te,
3136                             struct timeval yt, void *p)
3137 {
3138         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3139
3140         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3141                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3142
3143                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3144                                  ctdb_restart_recd, ctdb);
3145
3146                 return;
3147         }
3148
3149         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3150                          timeval_current_ofs(30, 0),
3151                          ctdb_check_recd, ctdb);
3152 }
3153
3154 static void recd_sig_child_handler(struct tevent_context *ev,
3155                                    struct tevent_signal *se, int signum,
3156                                    int count, void *dont_care,
3157                                    void *private_data)
3158 {
3159 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3160         int status;
3161         pid_t pid = -1;
3162
3163         while (pid != 0) {
3164                 pid = waitpid(-1, &status, WNOHANG);
3165                 if (pid == -1) {
3166                         if (errno != ECHILD) {
3167                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3168                         }
3169                         return;
3170                 }
3171                 if (pid > 0) {
3172                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3173                 }
3174         }
3175 }
3176
3177 /*
3178   startup the recovery daemon as a child of the main ctdb daemon
3179  */
3180 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3181 {
3182         int fd[2];
3183         struct tevent_signal *se;
3184         struct tevent_fd *fde;
3185         int ret;
3186
3187         if (pipe(fd) != 0) {
3188                 return -1;
3189         }
3190
3191         ctdb->recoverd_pid = ctdb_fork(ctdb);
3192         if (ctdb->recoverd_pid == -1) {
3193                 return -1;
3194         }
3195
3196         if (ctdb->recoverd_pid != 0) {
3197                 talloc_free(ctdb->recd_ctx);
3198                 ctdb->recd_ctx = talloc_new(ctdb);
3199                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3200
3201                 close(fd[0]);
3202                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3203                                  timeval_current_ofs(30, 0),
3204                                  ctdb_check_recd, ctdb);
3205                 return 0;
3206         }
3207
3208         close(fd[1]);
3209
3210         srandom(getpid() ^ time(NULL));
3211
3212         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3213         if (ret != 0) {
3214                 return -1;
3215         }
3216
3217         prctl_set_comment("ctdb_recoverd");
3218         if (switch_from_server_to_client(ctdb) != 0) {
3219                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3220                 exit(1);
3221         }
3222
3223         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3224
3225         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3226                             ctdb_recoverd_parent, &fd[0]);
3227         tevent_fd_set_auto_close(fde);
3228
3229         /* set up a handler to pick up sigchld */
3230         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3231                                recd_sig_child_handler, ctdb);
3232         if (se == NULL) {
3233                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3234                 exit(1);
3235         }
3236
3237         monitor_cluster(ctdb);
3238
3239         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3240         return -1;
3241 }
3242
3243 /*
3244   shutdown the recovery daemon
3245  */
3246 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3247 {
3248         if (ctdb->recoverd_pid == 0) {
3249                 return;
3250         }
3251
3252         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3253         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3254
3255         TALLOC_FREE(ctdb->recd_ctx);
3256         TALLOC_FREE(ctdb->recd_ping_count);
3257 }
3258
3259 static void ctdb_restart_recd(struct tevent_context *ev,
3260                               struct tevent_timer *te,
3261                               struct timeval t, void *private_data)
3262 {
3263         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3264
3265         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3266         ctdb_stop_recoverd(ctdb);
3267         ctdb_start_recoverd(ctdb);
3268 }