ctdb-common: Rename system utility files
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system_socket.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "ctdb_cluster_mutex.h"
46
47 /* List of SRVID requests that need to be processed */
48 struct srvid_list {
49         struct srvid_list *next, *prev;
50         struct ctdb_srvid_message *request;
51 };
52
53 struct srvid_requests {
54         struct srvid_list *requests;
55 };
56
57 static void srvid_request_reply(struct ctdb_context *ctdb,
58                                 struct ctdb_srvid_message *request,
59                                 TDB_DATA result)
60 {
61         /* Someone that sent srvid==0 does not want a reply */
62         if (request->srvid == 0) {
63                 talloc_free(request);
64                 return;
65         }
66
67         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
68                                      result) == 0) {
69                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
70                                   (unsigned)request->pnn,
71                                   (unsigned long long)request->srvid));
72         } else {
73                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
74                                  (unsigned)request->pnn,
75                                  (unsigned long long)request->srvid));
76         }
77
78         talloc_free(request);
79 }
80
81 static void srvid_requests_reply(struct ctdb_context *ctdb,
82                                  struct srvid_requests **requests,
83                                  TDB_DATA result)
84 {
85         struct srvid_list *r;
86
87         if (*requests == NULL) {
88                 return;
89         }
90
91         for (r = (*requests)->requests; r != NULL; r = r->next) {
92                 srvid_request_reply(ctdb, r->request, result);
93         }
94
95         /* Free the list structure... */
96         TALLOC_FREE(*requests);
97 }
98
99 static void srvid_request_add(struct ctdb_context *ctdb,
100                               struct srvid_requests **requests,
101                               struct ctdb_srvid_message *request)
102 {
103         struct srvid_list *t;
104         int32_t ret;
105         TDB_DATA result;
106
107         if (*requests == NULL) {
108                 *requests = talloc_zero(ctdb, struct srvid_requests);
109                 if (*requests == NULL) {
110                         goto nomem;
111                 }
112         }
113
114         t = talloc_zero(*requests, struct srvid_list);
115         if (t == NULL) {
116                 /* If *requests was just allocated above then free it */
117                 if ((*requests)->requests == NULL) {
118                         TALLOC_FREE(*requests);
119                 }
120                 goto nomem;
121         }
122
123         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
124         DLIST_ADD((*requests)->requests, t);
125
126         return;
127
128 nomem:
129         /* Failed to add the request to the list.  Send a fail. */
130         DEBUG(DEBUG_ERR, (__location__
131                           " Out of memory, failed to queue SRVID request\n"));
132         ret = -ENOMEM;
133         result.dsize = sizeof(ret);
134         result.dptr = (uint8_t *)&ret;
135         srvid_request_reply(ctdb, request, result);
136 }
137
138 /* An abstraction to allow an operation (takeover runs, recoveries,
139  * ...) to be disabled for a given timeout */
140 struct ctdb_op_state {
141         struct tevent_timer *timer;
142         bool in_progress;
143         const char *name;
144 };
145
146 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
147 {
148         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
149
150         if (state != NULL) {
151                 state->in_progress = false;
152                 state->name = name;
153         }
154
155         return state;
156 }
157
158 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
159 {
160         return state->timer != NULL;
161 }
162
163 static bool ctdb_op_begin(struct ctdb_op_state *state)
164 {
165         if (ctdb_op_is_disabled(state)) {
166                 DEBUG(DEBUG_NOTICE,
167                       ("Unable to begin - %s are disabled\n", state->name));
168                 return false;
169         }
170
171         state->in_progress = true;
172         return true;
173 }
174
175 static bool ctdb_op_end(struct ctdb_op_state *state)
176 {
177         return state->in_progress = false;
178 }
179
180 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
181 {
182         return state->in_progress;
183 }
184
185 static void ctdb_op_enable(struct ctdb_op_state *state)
186 {
187         TALLOC_FREE(state->timer);
188 }
189
190 static void ctdb_op_timeout_handler(struct tevent_context *ev,
191                                     struct tevent_timer *te,
192                                     struct timeval yt, void *p)
193 {
194         struct ctdb_op_state *state =
195                 talloc_get_type(p, struct ctdb_op_state);
196
197         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
198         ctdb_op_enable(state);
199 }
200
201 static int ctdb_op_disable(struct ctdb_op_state *state,
202                            struct tevent_context *ev,
203                            uint32_t timeout)
204 {
205         if (timeout == 0) {
206                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
207                 ctdb_op_enable(state);
208                 return 0;
209         }
210
211         if (state->in_progress) {
212                 DEBUG(DEBUG_ERR,
213                       ("Unable to disable %s - in progress\n", state->name));
214                 return -EAGAIN;
215         }
216
217         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
218                             state->name, timeout));
219
220         /* Clear any old timers */
221         talloc_free(state->timer);
222
223         /* Arrange for the timeout to occur */
224         state->timer = tevent_add_timer(ev, state,
225                                         timeval_current_ofs(timeout, 0),
226                                         ctdb_op_timeout_handler, state);
227         if (state->timer == NULL) {
228                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
229                 return -ENOMEM;
230         }
231
232         return 0;
233 }
234
235 struct ctdb_banning_state {
236         uint32_t count;
237         struct timeval last_reported_time;
238 };
239
240 /*
241   private state of recovery daemon
242  */
243 struct ctdb_recoverd {
244         struct ctdb_context *ctdb;
245         uint32_t recmaster;
246         uint32_t last_culprit_node;
247         struct ctdb_node_map_old *nodemap;
248         struct timeval priority_time;
249         bool need_takeover_run;
250         bool need_recovery;
251         uint32_t node_flags;
252         struct tevent_timer *send_election_te;
253         struct tevent_timer *election_timeout;
254         struct srvid_requests *reallocate_requests;
255         struct ctdb_op_state *takeover_run;
256         struct ctdb_op_state *recovery;
257         struct ctdb_iface_list_old *ifaces;
258         uint32_t *force_rebalance_nodes;
259         struct ctdb_node_capabilities *caps;
260         bool frozen_on_inactive;
261         struct ctdb_cluster_mutex_handle *recovery_lock_handle;
262 };
263
264 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
265 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
266
267 static void ctdb_restart_recd(struct tevent_context *ev,
268                               struct tevent_timer *te, struct timeval t,
269                               void *private_data);
270
271 /*
272   ban a node for a period of time
273  */
274 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
275 {
276         int ret;
277         struct ctdb_context *ctdb = rec->ctdb;
278         struct ctdb_ban_state bantime;
279
280         if (!ctdb_validate_pnn(ctdb, pnn)) {
281                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
282                 return;
283         }
284
285         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
286
287         bantime.pnn  = pnn;
288         bantime.time = ban_time;
289
290         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
291         if (ret != 0) {
292                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
293                 return;
294         }
295
296 }
297
298 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
299
300
301 /*
302   remember the trouble maker
303  */
304 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
305 {
306         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
307         struct ctdb_banning_state *ban_state;
308
309         if (culprit > ctdb->num_nodes) {
310                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
311                 return;
312         }
313
314         /* If we are banned or stopped, do not set other nodes as culprits */
315         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
316                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
317                 return;
318         }
319
320         if (ctdb->nodes[culprit]->ban_state == NULL) {
321                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
322                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
323
324                 
325         }
326         ban_state = ctdb->nodes[culprit]->ban_state;
327         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
328                 /* this was the first time in a long while this node
329                    misbehaved so we will forgive any old transgressions.
330                 */
331                 ban_state->count = 0;
332         }
333
334         ban_state->count += count;
335         ban_state->last_reported_time = timeval_current();
336         rec->last_culprit_node = culprit;
337 }
338
339 /*
340   remember the trouble maker
341  */
342 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
343 {
344         ctdb_set_culprit_count(rec, culprit, 1);
345 }
346
347 /*
348   Retrieve capabilities from all connected nodes
349  */
350 static int update_capabilities(struct ctdb_recoverd *rec,
351                                struct ctdb_node_map_old *nodemap)
352 {
353         uint32_t *capp;
354         TALLOC_CTX *tmp_ctx;
355         struct ctdb_node_capabilities *caps;
356         struct ctdb_context *ctdb = rec->ctdb;
357
358         tmp_ctx = talloc_new(rec);
359         CTDB_NO_MEMORY(ctdb, tmp_ctx);
360
361         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
362                                      CONTROL_TIMEOUT(), nodemap);
363
364         if (caps == NULL) {
365                 DEBUG(DEBUG_ERR,
366                       (__location__ " Failed to get node capabilities\n"));
367                 talloc_free(tmp_ctx);
368                 return -1;
369         }
370
371         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
372         if (capp == NULL) {
373                 DEBUG(DEBUG_ERR,
374                       (__location__
375                        " Capabilities don't include current node.\n"));
376                 talloc_free(tmp_ctx);
377                 return -1;
378         }
379         ctdb->capabilities = *capp;
380
381         TALLOC_FREE(rec->caps);
382         rec->caps = talloc_steal(rec, caps);
383
384         talloc_free(tmp_ctx);
385         return 0;
386 }
387
388 /*
389   change recovery mode on all nodes
390  */
391 static int set_recovery_mode(struct ctdb_context *ctdb,
392                              struct ctdb_recoverd *rec,
393                              struct ctdb_node_map_old *nodemap,
394                              uint32_t rec_mode)
395 {
396         TDB_DATA data;
397         uint32_t *nodes;
398         TALLOC_CTX *tmp_ctx;
399
400         tmp_ctx = talloc_new(ctdb);
401         CTDB_NO_MEMORY(ctdb, tmp_ctx);
402
403         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
404
405         data.dsize = sizeof(uint32_t);
406         data.dptr = (unsigned char *)&rec_mode;
407
408         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
409                                         nodes, 0,
410                                         CONTROL_TIMEOUT(),
411                                         false, data,
412                                         NULL, NULL,
413                                         NULL) != 0) {
414                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
415                 talloc_free(tmp_ctx);
416                 return -1;
417         }
418
419         talloc_free(tmp_ctx);
420         return 0;
421 }
422
423 /*
424   ensure all other nodes have attached to any databases that we have
425  */
426 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
427                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
428 {
429         int i, j, db, ret;
430         struct ctdb_dbid_map_old *remote_dbmap;
431
432         /* verify that all other nodes have all our databases */
433         for (j=0; j<nodemap->num; j++) {
434                 /* we don't need to ourself ourselves */
435                 if (nodemap->nodes[j].pnn == pnn) {
436                         continue;
437                 }
438                 /* don't check nodes that are unavailable */
439                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
440                         continue;
441                 }
442
443                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
444                                          mem_ctx, &remote_dbmap);
445                 if (ret != 0) {
446                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
447                         return -1;
448                 }
449
450                 /* step through all local databases */
451                 for (db=0; db<dbmap->num;db++) {
452                         const char *name;
453
454
455                         for (i=0;i<remote_dbmap->num;i++) {
456                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
457                                         break;
458                                 }
459                         }
460                         /* the remote node already have this database */
461                         if (i!=remote_dbmap->num) {
462                                 continue;
463                         }
464                         /* ok so we need to create this database */
465                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
466                                                   dbmap->dbs[db].db_id, mem_ctx,
467                                                   &name);
468                         if (ret != 0) {
469                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
470                                 return -1;
471                         }
472                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
473                                                  nodemap->nodes[j].pnn,
474                                                  mem_ctx, name,
475                                                  dbmap->dbs[db].flags, NULL);
476                         if (ret != 0) {
477                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
478                                 return -1;
479                         }
480                 }
481         }
482
483         return 0;
484 }
485
486
487 /*
488   ensure we are attached to any databases that anyone else is attached to
489  */
490 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
491                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
492 {
493         int i, j, db, ret;
494         struct ctdb_dbid_map_old *remote_dbmap;
495
496         /* verify that we have all database any other node has */
497         for (j=0; j<nodemap->num; j++) {
498                 /* we don't need to ourself ourselves */
499                 if (nodemap->nodes[j].pnn == pnn) {
500                         continue;
501                 }
502                 /* don't check nodes that are unavailable */
503                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
504                         continue;
505                 }
506
507                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                          mem_ctx, &remote_dbmap);
509                 if (ret != 0) {
510                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
511                         return -1;
512                 }
513
514                 /* step through all databases on the remote node */
515                 for (db=0; db<remote_dbmap->num;db++) {
516                         const char *name;
517
518                         for (i=0;i<(*dbmap)->num;i++) {
519                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
520                                         break;
521                                 }
522                         }
523                         /* we already have this db locally */
524                         if (i!=(*dbmap)->num) {
525                                 continue;
526                         }
527                         /* ok so we need to create this database and
528                            rebuild dbmap
529                          */
530                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
531                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
532                         if (ret != 0) {
533                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
534                                           nodemap->nodes[j].pnn));
535                                 return -1;
536                         }
537                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
538                                            mem_ctx, name,
539                                            remote_dbmap->dbs[db].flags, NULL);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
542                                 return -1;
543                         }
544                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
545                         if (ret != 0) {
546                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
547                                 return -1;
548                         }
549                 }
550         }
551
552         return 0;
553 }
554
555 /*
556   update flags on all active nodes
557  */
558 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
559 {
560         int ret;
561
562         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
563                 if (ret != 0) {
564                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
565                 return -1;
566         }
567
568         return 0;
569 }
570
571 /*
572   called when a vacuum fetch has completed - just free it and do the next one
573  */
574 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
575 {
576         talloc_free(state);
577 }
578
579
580 /**
581  * Process one elements of the vacuum fetch list:
582  * Migrate it over to us with the special flag
583  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
584  */
585 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
586                                      uint32_t pnn,
587                                      struct ctdb_rec_data_old *r)
588 {
589         struct ctdb_client_call_state *state;
590         TDB_DATA data;
591         struct ctdb_ltdb_header *hdr;
592         struct ctdb_call call;
593
594         ZERO_STRUCT(call);
595         call.call_id = CTDB_NULL_FUNC;
596         call.flags = CTDB_IMMEDIATE_MIGRATION;
597         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
598
599         call.key.dptr = &r->data[0];
600         call.key.dsize = r->keylen;
601
602         /* ensure we don't block this daemon - just skip a record if we can't get
603            the chainlock */
604         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
605                 return true;
606         }
607
608         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
609         if (data.dptr == NULL) {
610                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
611                 return true;
612         }
613
614         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
615                 free(data.dptr);
616                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
617                 return true;
618         }
619
620         hdr = (struct ctdb_ltdb_header *)data.dptr;
621         if (hdr->dmaster == pnn) {
622                 /* its already local */
623                 free(data.dptr);
624                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
625                 return true;
626         }
627
628         free(data.dptr);
629
630         state = ctdb_call_send(ctdb_db, &call);
631         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
632         if (state == NULL) {
633                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
634                 return false;
635         }
636         state->async.fn = vacuum_fetch_callback;
637         state->async.private_data = NULL;
638
639         return true;
640 }
641
642
643 /*
644   handler for vacuum fetch
645 */
646 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
647                                  void *private_data)
648 {
649         struct ctdb_recoverd *rec = talloc_get_type(
650                 private_data, struct ctdb_recoverd);
651         struct ctdb_context *ctdb = rec->ctdb;
652         struct ctdb_marshall_buffer *recs;
653         int ret, i;
654         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
655         const char *name;
656         struct ctdb_dbid_map_old *dbmap=NULL;
657         uint8_t db_flags = 0;
658         struct ctdb_db_context *ctdb_db;
659         struct ctdb_rec_data_old *r;
660
661         recs = (struct ctdb_marshall_buffer *)data.dptr;
662
663         if (recs->count == 0) {
664                 goto done;
665         }
666
667         /* work out if the database is persistent */
668         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
671                 goto done;
672         }
673
674         for (i=0;i<dbmap->num;i++) {
675                 if (dbmap->dbs[i].db_id == recs->db_id) {
676                         db_flags = dbmap->dbs[i].flags;
677                         break;
678                 }
679         }
680         if (i == dbmap->num) {
681                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
682                 goto done;
683         }
684
685         /* find the name of this database */
686         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
687                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
688                 goto done;
689         }
690
691         /* attach to it */
692         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
693         if (ctdb_db == NULL) {
694                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
695                 goto done;
696         }
697
698         r = (struct ctdb_rec_data_old *)&recs->data[0];
699         while (recs->count) {
700                 bool ok;
701
702                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
703                 if (!ok) {
704                         break;
705                 }
706
707                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
708                 recs->count--;
709         }
710
711 done:
712         talloc_free(tmp_ctx);
713 }
714
715
716 /*
717  * handler for database detach
718  */
719 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
720                                     void *private_data)
721 {
722         struct ctdb_recoverd *rec = talloc_get_type(
723                 private_data, struct ctdb_recoverd);
724         struct ctdb_context *ctdb = rec->ctdb;
725         uint32_t db_id;
726         struct ctdb_db_context *ctdb_db;
727
728         if (data.dsize != sizeof(db_id)) {
729                 return;
730         }
731         db_id = *(uint32_t *)data.dptr;
732
733         ctdb_db = find_ctdb_db(ctdb, db_id);
734         if (ctdb_db == NULL) {
735                 /* database is not attached */
736                 return;
737         }
738
739         DLIST_REMOVE(ctdb->db_list, ctdb_db);
740
741         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
742                              ctdb_db->db_name));
743         talloc_free(ctdb_db);
744 }
745
746 /*
747   called when ctdb_wait_timeout should finish
748  */
749 static void ctdb_wait_handler(struct tevent_context *ev,
750                               struct tevent_timer *te,
751                               struct timeval yt, void *p)
752 {
753         uint32_t *timed_out = (uint32_t *)p;
754         (*timed_out) = 1;
755 }
756
757 /*
758   wait for a given number of seconds
759  */
760 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
761 {
762         uint32_t timed_out = 0;
763         time_t usecs = (secs - (time_t)secs) * 1000000;
764         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
765                          ctdb_wait_handler, &timed_out);
766         while (!timed_out) {
767                 tevent_loop_once(ctdb->ev);
768         }
769 }
770
771 /*
772   called when an election times out (ends)
773  */
774 static void ctdb_election_timeout(struct tevent_context *ev,
775                                   struct tevent_timer *te,
776                                   struct timeval t, void *p)
777 {
778         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
779         rec->election_timeout = NULL;
780         fast_start = false;
781
782         DEBUG(DEBUG_WARNING,("Election period ended\n"));
783 }
784
785
786 /*
787   wait for an election to finish. It finished election_timeout seconds after
788   the last election packet is received
789  */
790 static void ctdb_wait_election(struct ctdb_recoverd *rec)
791 {
792         struct ctdb_context *ctdb = rec->ctdb;
793         while (rec->election_timeout) {
794                 tevent_loop_once(ctdb->ev);
795         }
796 }
797
798 /*
799   Update our local flags from all remote connected nodes. 
800   This is only run when we are or we belive we are the recovery master
801  */
802 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
803 {
804         int j;
805         struct ctdb_context *ctdb = rec->ctdb;
806         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
807
808         /* get the nodemap for all active remote nodes and verify
809            they are the same as for this node
810          */
811         for (j=0; j<nodemap->num; j++) {
812                 struct ctdb_node_map_old *remote_nodemap=NULL;
813                 int ret;
814
815                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
816                         continue;
817                 }
818                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
819                         continue;
820                 }
821
822                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
823                                            mem_ctx, &remote_nodemap);
824                 if (ret != 0) {
825                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
826                                   nodemap->nodes[j].pnn));
827                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
828                         talloc_free(mem_ctx);
829                         return -1;
830                 }
831                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
832                         /* We should tell our daemon about this so it
833                            updates its flags or else we will log the same 
834                            message again in the next iteration of recovery.
835                            Since we are the recovery master we can just as
836                            well update the flags on all nodes.
837                         */
838                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
839                         if (ret != 0) {
840                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
841                                 return -1;
842                         }
843
844                         /* Update our local copy of the flags in the recovery
845                            daemon.
846                         */
847                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
848                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
849                                  nodemap->nodes[j].flags));
850                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
851                 }
852                 talloc_free(remote_nodemap);
853         }
854         talloc_free(mem_ctx);
855         return 0;
856 }
857
858
859 /* Create a new random generation id.
860    The generation id can not be the INVALID_GENERATION id
861 */
862 static uint32_t new_generation(void)
863 {
864         uint32_t generation;
865
866         while (1) {
867                 generation = random();
868
869                 if (generation != INVALID_GENERATION) {
870                         break;
871                 }
872         }
873
874         return generation;
875 }
876
877 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
878 {
879         return (rec->recovery_lock_handle != NULL);
880 }
881
882 struct hold_reclock_state {
883         bool done;
884         bool locked;
885         double latency;
886 };
887
888 static void take_reclock_handler(char status,
889                                  double latency,
890                                  void *private_data)
891 {
892         struct hold_reclock_state *s =
893                 (struct hold_reclock_state *) private_data;
894
895         switch (status) {
896         case '0':
897                 s->latency = latency;
898                 break;
899
900         case '1':
901                 DEBUG(DEBUG_ERR,
902                       ("Unable to take recovery lock - contention\n"));
903                 break;
904
905         default:
906                 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
907         }
908
909         s->done = true;
910         s->locked = (status == '0') ;
911 }
912
913 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec);
914
915 static void lost_reclock_handler(void *private_data)
916 {
917         struct ctdb_recoverd *rec = talloc_get_type_abort(
918                 private_data, struct ctdb_recoverd);
919
920         DEBUG(DEBUG_ERR,
921               ("Recovery lock helper terminated unexpectedly - "
922                "trying to retake recovery lock\n"));
923         TALLOC_FREE(rec->recovery_lock_handle);
924         if (! ctdb_recovery_lock(rec)) {
925                 DEBUG(DEBUG_ERR, ("Failed to take recovery lock\n"));
926         }
927 }
928
929 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
930 {
931         struct ctdb_context *ctdb = rec->ctdb;
932         struct ctdb_cluster_mutex_handle *h;
933         struct hold_reclock_state s = {
934                 .done = false,
935                 .locked = false,
936                 .latency = 0,
937         };
938
939         h = ctdb_cluster_mutex(rec, ctdb, ctdb->recovery_lock, 0,
940                                take_reclock_handler, &s,
941                                lost_reclock_handler, rec);
942         if (h == NULL) {
943                 return false;
944         }
945
946         while (!s.done) {
947                 tevent_loop_once(ctdb->ev);
948         }
949
950         if (! s.locked) {
951                 talloc_free(h);
952                 return false;
953         }
954
955         rec->recovery_lock_handle = h;
956         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(),
957                                            s.latency);
958
959         return true;
960 }
961
962 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
963 {
964         if (rec->recovery_lock_handle != NULL) {
965                 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
966                 TALLOC_FREE(rec->recovery_lock_handle);
967         }
968 }
969
970 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
971 {
972         struct ctdb_context *ctdb = rec->ctdb;
973         int i;
974         struct ctdb_banning_state *ban_state;
975
976         *self_ban = false;
977         for (i=0; i<ctdb->num_nodes; i++) {
978                 if (ctdb->nodes[i]->ban_state == NULL) {
979                         continue;
980                 }
981                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
982                 if (ban_state->count < 2*ctdb->num_nodes) {
983                         continue;
984                 }
985
986                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
987                         ctdb->nodes[i]->pnn, ban_state->count,
988                         ctdb->tunable.recovery_ban_period));
989                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
990                 ban_state->count = 0;
991
992                 /* Banning ourself? */
993                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
994                         *self_ban = true;
995                 }
996         }
997 }
998
999 struct helper_state {
1000         int fd[2];
1001         pid_t pid;
1002         int result;
1003         bool done;
1004 };
1005
1006 static void helper_handler(struct tevent_context *ev,
1007                            struct tevent_fd *fde,
1008                            uint16_t flags, void *private_data)
1009 {
1010         struct helper_state *state = talloc_get_type_abort(
1011                 private_data, struct helper_state);
1012         int ret;
1013
1014         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1015         if (ret != sizeof(state->result)) {
1016                 state->result = EPIPE;
1017         }
1018
1019         state->done = true;
1020 }
1021
1022 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1023                       const char *prog, const char *arg, const char *type)
1024 {
1025         struct helper_state *state;
1026         struct tevent_fd *fde;
1027         const char **args;
1028         int nargs, ret;
1029         uint32_t recmaster = rec->recmaster;
1030
1031         state = talloc_zero(mem_ctx, struct helper_state);
1032         if (state == NULL) {
1033                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1034                 return -1;
1035         }
1036
1037         state->pid = -1;
1038
1039         ret = pipe(state->fd);
1040         if (ret != 0) {
1041                 DEBUG(DEBUG_ERR,
1042                       ("Failed to create pipe for %s helper\n", type));
1043                 goto fail;
1044         }
1045
1046         set_close_on_exec(state->fd[0]);
1047
1048         nargs = 4;
1049         args = talloc_array(state, const char *, nargs);
1050         if (args == NULL) {
1051                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1052                 goto fail;
1053         }
1054
1055         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1056         if (args[0] == NULL) {
1057                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1058                 goto fail;
1059         }
1060         args[1] = rec->ctdb->daemon.name;
1061         args[2] = arg;
1062         args[3] = NULL;
1063
1064         if (args[2] == NULL) {
1065                 nargs = 3;
1066         }
1067
1068         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
1069         if (state->pid == -1) {
1070                 DEBUG(DEBUG_ERR,
1071                       ("Failed to create child for %s helper\n", type));
1072                 goto fail;
1073         }
1074
1075         close(state->fd[1]);
1076         state->fd[1] = -1;
1077
1078         state->done = false;
1079
1080         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1081                             TEVENT_FD_READ, helper_handler, state);
1082         if (fde == NULL) {
1083                 goto fail;
1084         }
1085         tevent_fd_set_auto_close(fde);
1086
1087         while (!state->done) {
1088                 tevent_loop_once(rec->ctdb->ev);
1089
1090                 /* If recmaster changes, we have lost election */
1091                 if (recmaster != rec->recmaster) {
1092                         D_ERR("Recmaster changed to %u, aborting %s\n",
1093                               rec->recmaster, type);
1094                         state->result = 1;
1095                         break;
1096                 }
1097         }
1098
1099         close(state->fd[0]);
1100         state->fd[0] = -1;
1101
1102         if (state->result != 0) {
1103                 goto fail;
1104         }
1105
1106         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1107         talloc_free(state);
1108         return 0;
1109
1110 fail:
1111         if (state->fd[0] != -1) {
1112                 close(state->fd[0]);
1113         }
1114         if (state->fd[1] != -1) {
1115                 close(state->fd[1]);
1116         }
1117         if (state->pid != -1) {
1118                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1119         }
1120         talloc_free(state);
1121         return -1;
1122 }
1123
1124
1125 static int ctdb_takeover(struct ctdb_recoverd *rec,
1126                          uint32_t *force_rebalance_nodes)
1127 {
1128         static char prog[PATH_MAX+1] = "";
1129         char *arg;
1130         int i;
1131
1132         if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
1133                              "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
1134                              "ctdb_takeover_helper")) {
1135                 ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
1136         }
1137
1138         arg = NULL;
1139         for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
1140                 uint32_t pnn = force_rebalance_nodes[i];
1141                 if (arg == NULL) {
1142                         arg = talloc_asprintf(rec, "%u", pnn);
1143                 } else {
1144                         arg = talloc_asprintf_append(arg, ",%u", pnn);
1145                 }
1146                 if (arg == NULL) {
1147                         DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1148                         return -1;
1149                 }
1150         }
1151
1152         return helper_run(rec, rec, prog, arg, "takeover");
1153 }
1154
1155 static bool do_takeover_run(struct ctdb_recoverd *rec,
1156                             struct ctdb_node_map_old *nodemap)
1157 {
1158         uint32_t *nodes = NULL;
1159         struct ctdb_disable_message dtr;
1160         TDB_DATA data;
1161         int i;
1162         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1163         int ret;
1164         bool ok;
1165
1166         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1167
1168         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1169                 DEBUG(DEBUG_ERR, (__location__
1170                                   " takeover run already in progress \n"));
1171                 ok = false;
1172                 goto done;
1173         }
1174
1175         if (!ctdb_op_begin(rec->takeover_run)) {
1176                 ok = false;
1177                 goto done;
1178         }
1179
1180         /* Disable IP checks (takeover runs, really) on other nodes
1181          * while doing this takeover run.  This will stop those other
1182          * nodes from triggering takeover runs when think they should
1183          * be hosting an IP but it isn't yet on an interface.  Don't
1184          * wait for replies since a failure here might cause some
1185          * noise in the logs but will not actually cause a problem.
1186          */
1187         ZERO_STRUCT(dtr);
1188         dtr.srvid = 0; /* No reply */
1189         dtr.pnn = -1;
1190
1191         data.dptr  = (uint8_t*)&dtr;
1192         data.dsize = sizeof(dtr);
1193
1194         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1195
1196         /* Disable for 60 seconds.  This can be a tunable later if
1197          * necessary.
1198          */
1199         dtr.timeout = 60;
1200         for (i = 0; i < talloc_array_length(nodes); i++) {
1201                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1202                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1203                                              data) != 0) {
1204                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1205                 }
1206         }
1207
1208         ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
1209
1210         /* Reenable takeover runs and IP checks on other nodes */
1211         dtr.timeout = 0;
1212         for (i = 0; i < talloc_array_length(nodes); i++) {
1213                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1214                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1215                                              data) != 0) {
1216                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1217                 }
1218         }
1219
1220         if (ret != 0) {
1221                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1222                 ok = false;
1223                 goto done;
1224         }
1225
1226         ok = true;
1227         /* Takeover run was successful so clear force rebalance targets */
1228         if (rebalance_nodes == rec->force_rebalance_nodes) {
1229                 TALLOC_FREE(rec->force_rebalance_nodes);
1230         } else {
1231                 DEBUG(DEBUG_WARNING,
1232                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1233         }
1234 done:
1235         rec->need_takeover_run = !ok;
1236         talloc_free(nodes);
1237         ctdb_op_end(rec->takeover_run);
1238
1239         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1240         return ok;
1241 }
1242
1243 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1244 {
1245         static char prog[PATH_MAX+1] = "";
1246         const char *arg;
1247
1248         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1249                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1250                              "ctdb_recovery_helper")) {
1251                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1252         }
1253
1254         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1255         if (arg == NULL) {
1256                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1257                 return -1;
1258         }
1259
1260         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1261
1262         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1263 }
1264
1265 /*
1266   we are the recmaster, and recovery is needed - start a recovery run
1267  */
1268 static int do_recovery(struct ctdb_recoverd *rec,
1269                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1270                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1271 {
1272         struct ctdb_context *ctdb = rec->ctdb;
1273         int i, ret;
1274         struct ctdb_dbid_map_old *dbmap;
1275         bool self_ban;
1276
1277         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1278
1279         /* Check if the current node is still the recmaster.  It's possible that
1280          * re-election has changed the recmaster.
1281          */
1282         if (pnn != rec->recmaster) {
1283                 DEBUG(DEBUG_NOTICE,
1284                       ("Recovery master changed to %u, aborting recovery\n",
1285                        rec->recmaster));
1286                 return -1;
1287         }
1288
1289         /* if recovery fails, force it again */
1290         rec->need_recovery = true;
1291
1292         if (!ctdb_op_begin(rec->recovery)) {
1293                 return -1;
1294         }
1295
1296         if (rec->election_timeout) {
1297                 /* an election is in progress */
1298                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1299                 goto fail;
1300         }
1301
1302         ban_misbehaving_nodes(rec, &self_ban);
1303         if (self_ban) {
1304                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1305                 goto fail;
1306         }
1307
1308         if (ctdb->recovery_lock != NULL) {
1309                 if (ctdb_recovery_have_lock(rec)) {
1310                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
1311                 } else {
1312                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
1313                                              ctdb->recovery_lock));
1314                         if (!ctdb_recovery_lock(rec)) {
1315                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1316                                         /* If ctdb is trying first recovery, it's
1317                                          * possible that current node does not know
1318                                          * yet who the recmaster is.
1319                                          */
1320                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
1321                                                           " - retrying recovery\n"));
1322                                         goto fail;
1323                                 }
1324
1325                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1326                                                  "and ban ourself for %u seconds\n",
1327                                                  ctdb->tunable.recovery_ban_period));
1328                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1329                                 goto fail;
1330                         }
1331                         DEBUG(DEBUG_NOTICE,
1332                               ("Recovery lock taken successfully by recovery daemon\n"));
1333                 }
1334         }
1335
1336         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1337
1338         /* get a list of all databases */
1339         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1340         if (ret != 0) {
1341                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1342                 goto fail;
1343         }
1344
1345         /* we do the db creation before we set the recovery mode, so the freeze happens
1346            on all databases we will be dealing with. */
1347
1348         /* verify that we have all the databases any other node has */
1349         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1350         if (ret != 0) {
1351                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1352                 goto fail;
1353         }
1354
1355         /* verify that all other nodes have all our databases */
1356         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1357         if (ret != 0) {
1358                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1359                 goto fail;
1360         }
1361         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1362
1363
1364         /* Retrieve capabilities from all connected nodes */
1365         ret = update_capabilities(rec, nodemap);
1366         if (ret!=0) {
1367                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1368                 return -1;
1369         }
1370
1371         /*
1372           update all nodes to have the same flags that we have
1373          */
1374         for (i=0;i<nodemap->num;i++) {
1375                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1376                         continue;
1377                 }
1378
1379                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1380                 if (ret != 0) {
1381                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1382                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1383                         } else {
1384                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1385                                 return -1;
1386                         }
1387                 }
1388         }
1389
1390         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1391
1392         ret = db_recovery_parallel(rec, mem_ctx);
1393         if (ret != 0) {
1394                 goto fail;
1395         }
1396
1397         do_takeover_run(rec, nodemap);
1398
1399         /* send a message to all clients telling them that the cluster 
1400            has been reconfigured */
1401         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1402                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1403         if (ret != 0) {
1404                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1405                 goto fail;
1406         }
1407
1408         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1409
1410         rec->need_recovery = false;
1411         ctdb_op_end(rec->recovery);
1412
1413         /* we managed to complete a full recovery, make sure to forgive
1414            any past sins by the nodes that could now participate in the
1415            recovery.
1416         */
1417         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1418         for (i=0;i<nodemap->num;i++) {
1419                 struct ctdb_banning_state *ban_state;
1420
1421                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1422                         continue;
1423                 }
1424
1425                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1426                 if (ban_state == NULL) {
1427                         continue;
1428                 }
1429
1430                 ban_state->count = 0;
1431         }
1432
1433         /* We just finished a recovery successfully.
1434            We now wait for rerecovery_timeout before we allow
1435            another recovery to take place.
1436         */
1437         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1438         ctdb_op_disable(rec->recovery, ctdb->ev,
1439                         ctdb->tunable.rerecovery_timeout);
1440         return 0;
1441
1442 fail:
1443         ctdb_op_end(rec->recovery);
1444         return -1;
1445 }
1446
1447
1448 /*
1449   elections are won by first checking the number of connected nodes, then
1450   the priority time, then the pnn
1451  */
1452 struct election_message {
1453         uint32_t num_connected;
1454         struct timeval priority_time;
1455         uint32_t pnn;
1456         uint32_t node_flags;
1457 };
1458
1459 /*
1460   form this nodes election data
1461  */
1462 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1463 {
1464         int ret, i;
1465         struct ctdb_node_map_old *nodemap;
1466         struct ctdb_context *ctdb = rec->ctdb;
1467
1468         ZERO_STRUCTP(em);
1469
1470         em->pnn = rec->ctdb->pnn;
1471         em->priority_time = rec->priority_time;
1472
1473         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1474         if (ret != 0) {
1475                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1476                 return;
1477         }
1478
1479         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1480         em->node_flags = rec->node_flags;
1481
1482         for (i=0;i<nodemap->num;i++) {
1483                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1484                         em->num_connected++;
1485                 }
1486         }
1487
1488         /* we shouldnt try to win this election if we cant be a recmaster */
1489         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1490                 em->num_connected = 0;
1491                 em->priority_time = timeval_current();
1492         }
1493
1494         talloc_free(nodemap);
1495 }
1496
1497 /*
1498   see if the given election data wins
1499  */
1500 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1501 {
1502         struct election_message myem;
1503         int cmp = 0;
1504
1505         ctdb_election_data(rec, &myem);
1506
1507         /* we cant win if we don't have the recmaster capability */
1508         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1509                 return false;
1510         }
1511
1512         /* we cant win if we are banned */
1513         if (rec->node_flags & NODE_FLAGS_BANNED) {
1514                 return false;
1515         }
1516
1517         /* we cant win if we are stopped */
1518         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1519                 return false;
1520         }
1521
1522         /* we will automatically win if the other node is banned */
1523         if (em->node_flags & NODE_FLAGS_BANNED) {
1524                 return true;
1525         }
1526
1527         /* we will automatically win if the other node is banned */
1528         if (em->node_flags & NODE_FLAGS_STOPPED) {
1529                 return true;
1530         }
1531
1532         /* then the longest running node */
1533         if (cmp == 0) {
1534                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1535         }
1536
1537         if (cmp == 0) {
1538                 cmp = (int)myem.pnn - (int)em->pnn;
1539         }
1540
1541         return cmp > 0;
1542 }
1543
1544 /*
1545   send out an election request
1546  */
1547 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1548 {
1549         int ret;
1550         TDB_DATA election_data;
1551         struct election_message emsg;
1552         uint64_t srvid;
1553         struct ctdb_context *ctdb = rec->ctdb;
1554
1555         srvid = CTDB_SRVID_ELECTION;
1556
1557         ctdb_election_data(rec, &emsg);
1558
1559         election_data.dsize = sizeof(struct election_message);
1560         election_data.dptr  = (unsigned char *)&emsg;
1561
1562
1563         /* first we assume we will win the election and set 
1564            recoverymaster to be ourself on the current node
1565          */
1566         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1567                                      CTDB_CURRENT_NODE, pnn);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1570                 return -1;
1571         }
1572         rec->recmaster = pnn;
1573
1574         /* send an election message to all active nodes */
1575         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1576         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1577 }
1578
1579 /*
1580   we think we are winning the election - send a broadcast election request
1581  */
1582 static void election_send_request(struct tevent_context *ev,
1583                                   struct tevent_timer *te,
1584                                   struct timeval t, void *p)
1585 {
1586         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1587         int ret;
1588
1589         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1590         if (ret != 0) {
1591                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1592         }
1593
1594         TALLOC_FREE(rec->send_election_te);
1595 }
1596
1597 /*
1598   handler for memory dumps
1599 */
1600 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1601 {
1602         struct ctdb_recoverd *rec = talloc_get_type(
1603                 private_data, struct ctdb_recoverd);
1604         struct ctdb_context *ctdb = rec->ctdb;
1605         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1606         TDB_DATA *dump;
1607         int ret;
1608         struct ctdb_srvid_message *rd;
1609
1610         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1611                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1612                 talloc_free(tmp_ctx);
1613                 return;
1614         }
1615         rd = (struct ctdb_srvid_message *)data.dptr;
1616
1617         dump = talloc_zero(tmp_ctx, TDB_DATA);
1618         if (dump == NULL) {
1619                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1620                 talloc_free(tmp_ctx);
1621                 return;
1622         }
1623         ret = ctdb_dump_memory(ctdb, dump);
1624         if (ret != 0) {
1625                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1626                 talloc_free(tmp_ctx);
1627                 return;
1628         }
1629
1630 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1631
1632         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1633         if (ret != 0) {
1634                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1635                 talloc_free(tmp_ctx);
1636                 return;
1637         }
1638
1639         talloc_free(tmp_ctx);
1640 }
1641
1642 /*
1643   handler for reload_nodes
1644 */
1645 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1646                                  void *private_data)
1647 {
1648         struct ctdb_recoverd *rec = talloc_get_type(
1649                 private_data, struct ctdb_recoverd);
1650
1651         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1652
1653         ctdb_load_nodes_file(rec->ctdb);
1654 }
1655
1656
1657 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1658                                         void *private_data)
1659 {
1660         struct ctdb_recoverd *rec = talloc_get_type(
1661                 private_data, struct ctdb_recoverd);
1662         struct ctdb_context *ctdb = rec->ctdb;
1663         uint32_t pnn;
1664         uint32_t *t;
1665         int len;
1666
1667         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1668                 return;
1669         }
1670
1671         if (data.dsize != sizeof(uint32_t)) {
1672                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1673                 return;
1674         }
1675
1676         pnn = *(uint32_t *)&data.dptr[0];
1677
1678         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1679
1680         /* Copy any existing list of nodes.  There's probably some
1681          * sort of realloc variant that will do this but we need to
1682          * make sure that freeing the old array also cancels the timer
1683          * event for the timeout... not sure if realloc will do that.
1684          */
1685         len = (rec->force_rebalance_nodes != NULL) ?
1686                 talloc_array_length(rec->force_rebalance_nodes) :
1687                 0;
1688
1689         /* This allows duplicates to be added but they don't cause
1690          * harm.  A call to add a duplicate PNN arguably means that
1691          * the timeout should be reset, so this is the simplest
1692          * solution.
1693          */
1694         t = talloc_zero_array(rec, uint32_t, len+1);
1695         CTDB_NO_MEMORY_VOID(ctdb, t);
1696         if (len > 0) {
1697                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1698         }
1699         t[len] = pnn;
1700
1701         talloc_free(rec->force_rebalance_nodes);
1702
1703         rec->force_rebalance_nodes = t;
1704 }
1705
1706
1707
1708 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1709                                     TDB_DATA data,
1710                                     struct ctdb_op_state *op_state)
1711 {
1712         struct ctdb_disable_message *r;
1713         uint32_t timeout;
1714         TDB_DATA result;
1715         int32_t ret = 0;
1716
1717         /* Validate input data */
1718         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1719                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1720                                  "expecting %lu\n", (long unsigned)data.dsize,
1721                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1722                 return;
1723         }
1724         if (data.dptr == NULL) {
1725                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1726                 return;
1727         }
1728
1729         r = (struct ctdb_disable_message *)data.dptr;
1730         timeout = r->timeout;
1731
1732         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1733         if (ret != 0) {
1734                 goto done;
1735         }
1736
1737         /* Returning our PNN tells the caller that we succeeded */
1738         ret = ctdb_get_pnn(ctdb);
1739 done:
1740         result.dsize = sizeof(int32_t);
1741         result.dptr  = (uint8_t *)&ret;
1742         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1743 }
1744
1745 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1746                                           void *private_data)
1747 {
1748         struct ctdb_recoverd *rec = talloc_get_type(
1749                 private_data, struct ctdb_recoverd);
1750
1751         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1752 }
1753
1754 /* Backward compatibility for this SRVID */
1755 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1756                                      void *private_data)
1757 {
1758         struct ctdb_recoverd *rec = talloc_get_type(
1759                 private_data, struct ctdb_recoverd);
1760         uint32_t timeout;
1761
1762         if (data.dsize != sizeof(uint32_t)) {
1763                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1764                                  "expecting %lu\n", (long unsigned)data.dsize,
1765                                  (long unsigned)sizeof(uint32_t)));
1766                 return;
1767         }
1768         if (data.dptr == NULL) {
1769                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1770                 return;
1771         }
1772
1773         timeout = *((uint32_t *)data.dptr);
1774
1775         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1776 }
1777
1778 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1779                                        void *private_data)
1780 {
1781         struct ctdb_recoverd *rec = talloc_get_type(
1782                 private_data, struct ctdb_recoverd);
1783
1784         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1785 }
1786
1787 /*
1788   handler for ip reallocate, just add it to the list of requests and 
1789   handle this later in the monitor_cluster loop so we do not recurse
1790   with other requests to takeover_run()
1791 */
1792 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1793                                   void *private_data)
1794 {
1795         struct ctdb_srvid_message *request;
1796         struct ctdb_recoverd *rec = talloc_get_type(
1797                 private_data, struct ctdb_recoverd);
1798
1799         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1800                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1801                 return;
1802         }
1803
1804         request = (struct ctdb_srvid_message *)data.dptr;
1805
1806         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1807 }
1808
1809 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1810                                           struct ctdb_recoverd *rec)
1811 {
1812         TDB_DATA result;
1813         int32_t ret;
1814         struct srvid_requests *current;
1815
1816         /* Only process requests that are currently pending.  More
1817          * might come in while the takeover run is in progress and
1818          * they will need to be processed later since they might
1819          * be in response flag changes.
1820          */
1821         current = rec->reallocate_requests;
1822         rec->reallocate_requests = NULL;
1823
1824         if (do_takeover_run(rec, rec->nodemap)) {
1825                 ret = ctdb_get_pnn(ctdb);
1826         } else {
1827                 ret = -1;
1828         }
1829
1830         result.dsize = sizeof(int32_t);
1831         result.dptr  = (uint8_t *)&ret;
1832
1833         srvid_requests_reply(ctdb, &current, result);
1834 }
1835
1836 /*
1837  * handler for assigning banning credits
1838  */
1839 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1840 {
1841         struct ctdb_recoverd *rec = talloc_get_type(
1842                 private_data, struct ctdb_recoverd);
1843         uint32_t ban_pnn;
1844
1845         /* Ignore if we are not recmaster */
1846         if (rec->ctdb->pnn != rec->recmaster) {
1847                 return;
1848         }
1849
1850         if (data.dsize != sizeof(uint32_t)) {
1851                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1852                                   data.dsize));
1853                 return;
1854         }
1855
1856         ban_pnn = *(uint32_t *)data.dptr;
1857
1858         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1859 }
1860
1861 /*
1862   handler for recovery master elections
1863 */
1864 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1865 {
1866         struct ctdb_recoverd *rec = talloc_get_type(
1867                 private_data, struct ctdb_recoverd);
1868         struct ctdb_context *ctdb = rec->ctdb;
1869         int ret;
1870         struct election_message *em = (struct election_message *)data.dptr;
1871
1872         /* Ignore election packets from ourself */
1873         if (ctdb->pnn == em->pnn) {
1874                 return;
1875         }
1876
1877         /* we got an election packet - update the timeout for the election */
1878         talloc_free(rec->election_timeout);
1879         rec->election_timeout = tevent_add_timer(
1880                         ctdb->ev, ctdb,
1881                         fast_start ?
1882                                 timeval_current_ofs(0, 500000) :
1883                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1884                         ctdb_election_timeout, rec);
1885
1886         /* someone called an election. check their election data
1887            and if we disagree and we would rather be the elected node, 
1888            send a new election message to all other nodes
1889          */
1890         if (ctdb_election_win(rec, em)) {
1891                 if (!rec->send_election_te) {
1892                         rec->send_election_te = tevent_add_timer(
1893                                         ctdb->ev, rec,
1894                                         timeval_current_ofs(0, 500000),
1895                                         election_send_request, rec);
1896                 }
1897                 return;
1898         }
1899
1900         /* we didn't win */
1901         TALLOC_FREE(rec->send_election_te);
1902
1903         /* Release the recovery lock file */
1904         if (ctdb_recovery_have_lock(rec)) {
1905                 ctdb_recovery_unlock(rec);
1906         }
1907
1908         /* ok, let that guy become recmaster then */
1909         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1910                                      CTDB_CURRENT_NODE, em->pnn);
1911         if (ret != 0) {
1912                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1913                 return;
1914         }
1915         rec->recmaster = em->pnn;
1916
1917         return;
1918 }
1919
1920
1921 /*
1922   force the start of the election process
1923  */
1924 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1925                            struct ctdb_node_map_old *nodemap)
1926 {
1927         int ret;
1928         struct ctdb_context *ctdb = rec->ctdb;
1929
1930         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1931
1932         /* set all nodes to recovery mode to stop all internode traffic */
1933         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1934         if (ret != 0) {
1935                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1936                 return;
1937         }
1938
1939         talloc_free(rec->election_timeout);
1940         rec->election_timeout = tevent_add_timer(
1941                         ctdb->ev, ctdb,
1942                         fast_start ?
1943                                 timeval_current_ofs(0, 500000) :
1944                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1945                         ctdb_election_timeout, rec);
1946
1947         ret = send_election_request(rec, pnn);
1948         if (ret!=0) {
1949                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1950                 return;
1951         }
1952
1953         /* wait for a few seconds to collect all responses */
1954         ctdb_wait_election(rec);
1955 }
1956
1957
1958
1959 /*
1960   handler for when a node changes its flags
1961 */
1962 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1963 {
1964         struct ctdb_recoverd *rec = talloc_get_type(
1965                 private_data, struct ctdb_recoverd);
1966         struct ctdb_context *ctdb = rec->ctdb;
1967         int ret;
1968         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1969         struct ctdb_node_map_old *nodemap=NULL;
1970         TALLOC_CTX *tmp_ctx;
1971         int i;
1972
1973         if (data.dsize != sizeof(*c)) {
1974                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1975                 return;
1976         }
1977
1978         tmp_ctx = talloc_new(ctdb);
1979         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1980
1981         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1982         if (ret != 0) {
1983                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1984                 talloc_free(tmp_ctx);
1985                 return;         
1986         }
1987
1988
1989         for (i=0;i<nodemap->num;i++) {
1990                 if (nodemap->nodes[i].pnn == c->pnn) break;
1991         }
1992
1993         if (i == nodemap->num) {
1994                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1995                 talloc_free(tmp_ctx);
1996                 return;
1997         }
1998
1999         if (c->old_flags != c->new_flags) {
2000                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2001         }
2002
2003         nodemap->nodes[i].flags = c->new_flags;
2004
2005         talloc_free(tmp_ctx);
2006 }
2007
2008 /*
2009   handler for when we need to push out flag changes ot all other nodes
2010 */
2011 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2012                                void *private_data)
2013 {
2014         struct ctdb_recoverd *rec = talloc_get_type(
2015                 private_data, struct ctdb_recoverd);
2016         struct ctdb_context *ctdb = rec->ctdb;
2017         int ret;
2018         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2019         struct ctdb_node_map_old *nodemap=NULL;
2020         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2021         uint32_t *nodes;
2022
2023         /* read the node flags from the recmaster */
2024         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2025                                    tmp_ctx, &nodemap);
2026         if (ret != 0) {
2027                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2028                 talloc_free(tmp_ctx);
2029                 return;
2030         }
2031         if (c->pnn >= nodemap->num) {
2032                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2033                 talloc_free(tmp_ctx);
2034                 return;
2035         }
2036
2037         /* send the flags update to all connected nodes */
2038         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2039
2040         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2041                                       nodes, 0, CONTROL_TIMEOUT(),
2042                                       false, data,
2043                                       NULL, NULL,
2044                                       NULL) != 0) {
2045                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2046
2047                 talloc_free(tmp_ctx);
2048                 return;
2049         }
2050
2051         talloc_free(tmp_ctx);
2052 }
2053
2054
2055 struct verify_recmode_normal_data {
2056         uint32_t count;
2057         enum monitor_result status;
2058 };
2059
2060 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2061 {
2062         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2063
2064
2065         /* one more node has responded with recmode data*/
2066         rmdata->count--;
2067
2068         /* if we failed to get the recmode, then return an error and let
2069            the main loop try again.
2070         */
2071         if (state->state != CTDB_CONTROL_DONE) {
2072                 if (rmdata->status == MONITOR_OK) {
2073                         rmdata->status = MONITOR_FAILED;
2074                 }
2075                 return;
2076         }
2077
2078         /* if we got a response, then the recmode will be stored in the
2079            status field
2080         */
2081         if (state->status != CTDB_RECOVERY_NORMAL) {
2082                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2083                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2084         }
2085
2086         return;
2087 }
2088
2089
2090 /* verify that all nodes are in normal recovery mode */
2091 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2092 {
2093         struct verify_recmode_normal_data *rmdata;
2094         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2095         struct ctdb_client_control_state *state;
2096         enum monitor_result status;
2097         int j;
2098         
2099         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2100         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2101         rmdata->count  = 0;
2102         rmdata->status = MONITOR_OK;
2103
2104         /* loop over all active nodes and send an async getrecmode call to 
2105            them*/
2106         for (j=0; j<nodemap->num; j++) {
2107                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2108                         continue;
2109                 }
2110                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2111                                         CONTROL_TIMEOUT(), 
2112                                         nodemap->nodes[j].pnn);
2113                 if (state == NULL) {
2114                         /* we failed to send the control, treat this as 
2115                            an error and try again next iteration
2116                         */                      
2117                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2118                         talloc_free(mem_ctx);
2119                         return MONITOR_FAILED;
2120                 }
2121
2122                 /* set up the callback functions */
2123                 state->async.fn = verify_recmode_normal_callback;
2124                 state->async.private_data = rmdata;
2125
2126                 /* one more control to wait for to complete */
2127                 rmdata->count++;
2128         }
2129
2130
2131         /* now wait for up to the maximum number of seconds allowed
2132            or until all nodes we expect a response from has replied
2133         */
2134         while (rmdata->count > 0) {
2135                 tevent_loop_once(ctdb->ev);
2136         }
2137
2138         status = rmdata->status;
2139         talloc_free(mem_ctx);
2140         return status;
2141 }
2142
2143
2144 struct verify_recmaster_data {
2145         struct ctdb_recoverd *rec;
2146         uint32_t count;
2147         uint32_t pnn;
2148         enum monitor_result status;
2149 };
2150
2151 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2152 {
2153         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2154
2155
2156         /* one more node has responded with recmaster data*/
2157         rmdata->count--;
2158
2159         /* if we failed to get the recmaster, then return an error and let
2160            the main loop try again.
2161         */
2162         if (state->state != CTDB_CONTROL_DONE) {
2163                 if (rmdata->status == MONITOR_OK) {
2164                         rmdata->status = MONITOR_FAILED;
2165                 }
2166                 return;
2167         }
2168
2169         /* if we got a response, then the recmaster will be stored in the
2170            status field
2171         */
2172         if (state->status != rmdata->pnn) {
2173                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2174                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2175                 rmdata->status = MONITOR_ELECTION_NEEDED;
2176         }
2177
2178         return;
2179 }
2180
2181
2182 /* verify that all nodes agree that we are the recmaster */
2183 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
2184 {
2185         struct ctdb_context *ctdb = rec->ctdb;
2186         struct verify_recmaster_data *rmdata;
2187         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2188         struct ctdb_client_control_state *state;
2189         enum monitor_result status;
2190         int j;
2191         
2192         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2193         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2194         rmdata->rec    = rec;
2195         rmdata->count  = 0;
2196         rmdata->pnn    = pnn;
2197         rmdata->status = MONITOR_OK;
2198
2199         /* loop over all active nodes and send an async getrecmaster call to
2200            them*/
2201         for (j=0; j<nodemap->num; j++) {
2202                 if (nodemap->nodes[j].pnn == rec->recmaster) {
2203                         continue;
2204                 }
2205                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2206                         continue;
2207                 }
2208                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2209                                         CONTROL_TIMEOUT(),
2210                                         nodemap->nodes[j].pnn);
2211                 if (state == NULL) {
2212                         /* we failed to send the control, treat this as 
2213                            an error and try again next iteration
2214                         */                      
2215                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2216                         talloc_free(mem_ctx);
2217                         return MONITOR_FAILED;
2218                 }
2219
2220                 /* set up the callback functions */
2221                 state->async.fn = verify_recmaster_callback;
2222                 state->async.private_data = rmdata;
2223
2224                 /* one more control to wait for to complete */
2225                 rmdata->count++;
2226         }
2227
2228
2229         /* now wait for up to the maximum number of seconds allowed
2230            or until all nodes we expect a response from has replied
2231         */
2232         while (rmdata->count > 0) {
2233                 tevent_loop_once(ctdb->ev);
2234         }
2235
2236         status = rmdata->status;
2237         talloc_free(mem_ctx);
2238         return status;
2239 }
2240
2241 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2242                                     struct ctdb_recoverd *rec)
2243 {
2244         struct ctdb_iface_list_old *ifaces = NULL;
2245         TALLOC_CTX *mem_ctx;
2246         bool ret = false;
2247
2248         mem_ctx = talloc_new(NULL);
2249
2250         /* Read the interfaces from the local node */
2251         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2252                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2253                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2254                 /* We could return an error.  However, this will be
2255                  * rare so we'll decide that the interfaces have
2256                  * actually changed, just in case.
2257                  */
2258                 talloc_free(mem_ctx);
2259                 return true;
2260         }
2261
2262         if (!rec->ifaces) {
2263                 /* We haven't been here before so things have changed */
2264                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2265                 ret = true;
2266         } else if (rec->ifaces->num != ifaces->num) {
2267                 /* Number of interfaces has changed */
2268                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2269                                      rec->ifaces->num, ifaces->num));
2270                 ret = true;
2271         } else {
2272                 /* See if interface names or link states have changed */
2273                 int i;
2274                 for (i = 0; i < rec->ifaces->num; i++) {
2275                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2276                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2277                                 DEBUG(DEBUG_NOTICE,
2278                                       ("Interface in slot %d changed: %s => %s\n",
2279                                        i, iface->name, ifaces->ifaces[i].name));
2280                                 ret = true;
2281                                 break;
2282                         }
2283                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2284                                 DEBUG(DEBUG_NOTICE,
2285                                       ("Interface %s changed state: %d => %d\n",
2286                                        iface->name, iface->link_state,
2287                                        ifaces->ifaces[i].link_state));
2288                                 ret = true;
2289                                 break;
2290                         }
2291                 }
2292         }
2293
2294         talloc_free(rec->ifaces);
2295         rec->ifaces = talloc_steal(rec, ifaces);
2296
2297         talloc_free(mem_ctx);
2298         return ret;
2299 }
2300
2301 /* Check that the local allocation of public IP addresses is correct
2302  * and do some house-keeping */
2303 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2304                                       struct ctdb_recoverd *rec,
2305                                       uint32_t pnn,
2306                                       struct ctdb_node_map_old *nodemap)
2307 {
2308         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2309         int ret, j;
2310         bool need_takeover_run = false;
2311         struct ctdb_public_ip_list_old *ips = NULL;
2312
2313         /* If we are not the recmaster then do some housekeeping */
2314         if (rec->recmaster != pnn) {
2315                 /* Ignore any IP reallocate requests - only recmaster
2316                  * processes them
2317                  */
2318                 TALLOC_FREE(rec->reallocate_requests);
2319                 /* Clear any nodes that should be force rebalanced in
2320                  * the next takeover run.  If the recovery master role
2321                  * has moved then we don't want to process these some
2322                  * time in the future.
2323                  */
2324                 TALLOC_FREE(rec->force_rebalance_nodes);
2325         }
2326
2327         /* Return early if disabled... */
2328         if (ctdb->tunable.disable_ip_failover != 0 ||
2329             ctdb_op_is_disabled(rec->takeover_run)) {
2330                 return  0;
2331         }
2332
2333         if (interfaces_have_changed(ctdb, rec)) {
2334                 need_takeover_run = true;
2335         }
2336
2337         /* If there are unhosted IPs but this node can host them then
2338          * trigger an IP reallocation */
2339
2340         /* Read *available* IPs from local node */
2341         ret = ctdb_ctrl_get_public_ips_flags(
2342                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2343                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2344         if (ret != 0) {
2345                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2346                 talloc_free(mem_ctx);
2347                 return -1;
2348         }
2349
2350         for (j=0; j<ips->num; j++) {
2351                 if (ips->ips[j].pnn == -1 &&
2352                     nodemap->nodes[pnn].flags == 0) {
2353                         DEBUG(DEBUG_WARNING,
2354                               ("Unassigned IP %s can be served by this node\n",
2355                                ctdb_addr_to_str(&ips->ips[j].addr)));
2356                         need_takeover_run = true;
2357                 }
2358         }
2359
2360         talloc_free(ips);
2361
2362         if (!ctdb->do_checkpublicip) {
2363                 goto done;
2364         }
2365
2366         /* Validate the IP addresses that this node has on network
2367          * interfaces.  If there is an inconsistency between reality
2368          * and the state expected by CTDB then try to fix it by
2369          * triggering an IP reallocation or releasing extraneous IP
2370          * addresses. */
2371
2372         /* Read *known* IPs from local node */
2373         ret = ctdb_ctrl_get_public_ips_flags(
2374                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2375         if (ret != 0) {
2376                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2377                 talloc_free(mem_ctx);
2378                 return -1;
2379         }
2380
2381         for (j=0; j<ips->num; j++) {
2382                 if (ips->ips[j].pnn == pnn) {
2383                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2384                                 DEBUG(DEBUG_ERR,
2385                                       ("Assigned IP %s not on an interface\n",
2386                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2387                                 need_takeover_run = true;
2388                         }
2389                 } else {
2390                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2391                                 DEBUG(DEBUG_ERR,
2392                                       ("IP %s incorrectly on an interface\n",
2393                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2394                                 need_takeover_run = true;
2395                         }
2396                 }
2397         }
2398
2399 done:
2400         if (need_takeover_run) {
2401                 struct ctdb_srvid_message rd;
2402                 TDB_DATA data;
2403
2404                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2405
2406                 ZERO_STRUCT(rd);
2407                 rd.pnn = ctdb->pnn;
2408                 rd.srvid = 0;
2409                 data.dptr = (uint8_t *)&rd;
2410                 data.dsize = sizeof(rd);
2411
2412                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2413                 if (ret != 0) {
2414                         DEBUG(DEBUG_ERR,
2415                               ("Failed to send takeover run request\n"));
2416                 }
2417         }
2418         talloc_free(mem_ctx);
2419         return 0;
2420 }
2421
2422
2423 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2424 {
2425         struct ctdb_node_map_old **remote_nodemaps = callback_data;
2426
2427         if (node_pnn >= ctdb->num_nodes) {
2428                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2429                 return;
2430         }
2431
2432         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
2433
2434 }
2435
2436 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2437         struct ctdb_node_map_old *nodemap,
2438         struct ctdb_node_map_old **remote_nodemaps)
2439 {
2440         uint32_t *nodes;
2441
2442         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2443         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2444                                         nodes, 0,
2445                                         CONTROL_TIMEOUT(), false, tdb_null,
2446                                         async_getnodemap_callback,
2447                                         NULL,
2448                                         remote_nodemaps) != 0) {
2449                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2450
2451                 return -1;
2452         }
2453
2454         return 0;
2455 }
2456
2457 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2458                                      TALLOC_CTX *mem_ctx)
2459 {
2460         struct ctdb_context *ctdb = rec->ctdb;
2461         uint32_t pnn = ctdb_get_pnn(ctdb);
2462         struct ctdb_node_map_old *nodemap = rec->nodemap;
2463         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2464         int ret;
2465
2466         /* When recovery daemon is started, recmaster is set to
2467          * "unknown" so it knows to start an election.
2468          */
2469         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2470                 DEBUG(DEBUG_NOTICE,
2471                       ("Initial recovery master set - forcing election\n"));
2472                 force_election(rec, pnn, nodemap);
2473                 return false;
2474         }
2475
2476         /*
2477          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2478          * but we have, then force an election and try to become the new
2479          * recmaster.
2480          */
2481         if (!ctdb_node_has_capabilities(rec->caps,
2482                                         rec->recmaster,
2483                                         CTDB_CAP_RECMASTER) &&
2484             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2485             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2486                 DEBUG(DEBUG_ERR,
2487                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2488                        " but we (node %u) have - force an election\n",
2489                        rec->recmaster, pnn));
2490                 force_election(rec, pnn, nodemap);
2491                 return false;
2492         }
2493
2494         /* Verify that the master node has not been deleted.  This
2495          * should not happen because a node should always be shutdown
2496          * before being deleted, causing a new master to be elected
2497          * before now.  However, if something strange has happened
2498          * then checking here will ensure we don't index beyond the
2499          * end of the nodemap array. */
2500         if (rec->recmaster >= nodemap->num) {
2501                 DEBUG(DEBUG_ERR,
2502                       ("Recmaster node %u has been deleted. Force election\n",
2503                        rec->recmaster));
2504                 force_election(rec, pnn, nodemap);
2505                 return false;
2506         }
2507
2508         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2509         if (nodemap->nodes[rec->recmaster].flags &
2510             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2511                 DEBUG(DEBUG_NOTICE,
2512                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2513                        rec->recmaster));
2514                 force_election(rec, pnn, nodemap);
2515                 return false;
2516         }
2517
2518         /* get nodemap from the recovery master to check if it is inactive */
2519         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2520                                    mem_ctx, &recmaster_nodemap);
2521         if (ret != 0) {
2522                 DEBUG(DEBUG_ERR,
2523                       (__location__
2524                        " Unable to get nodemap from recovery master %u\n",
2525                           rec->recmaster));
2526                 /* No election, just error */
2527                 return false;
2528         }
2529
2530
2531         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2532             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2533                 DEBUG(DEBUG_NOTICE,
2534                       ("Recmaster node %u is inactive. Force election\n",
2535                        rec->recmaster));
2536                 /*
2537                  * update our nodemap to carry the recmaster's notion of
2538                  * its own flags, so that we don't keep freezing the
2539                  * inactive recmaster node...
2540                  */
2541                 nodemap->nodes[rec->recmaster].flags =
2542                         recmaster_nodemap->nodes[rec->recmaster].flags;
2543                 force_election(rec, pnn, nodemap);
2544                 return false;
2545         }
2546
2547         return true;
2548 }
2549
2550 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2551                       TALLOC_CTX *mem_ctx)
2552 {
2553         uint32_t pnn;
2554         struct ctdb_node_map_old *nodemap=NULL;
2555         struct ctdb_node_map_old **remote_nodemaps=NULL;
2556         struct ctdb_vnn_map *vnnmap=NULL;
2557         struct ctdb_vnn_map *remote_vnnmap=NULL;
2558         uint32_t num_lmasters;
2559         int32_t debug_level;
2560         int i, j, ret;
2561         bool self_ban;
2562
2563
2564         /* verify that the main daemon is still running */
2565         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2566                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2567                 exit(-1);
2568         }
2569
2570         /* ping the local daemon to tell it we are alive */
2571         ctdb_ctrl_recd_ping(ctdb);
2572
2573         if (rec->election_timeout) {
2574                 /* an election is in progress */
2575                 return;
2576         }
2577
2578         /* read the debug level from the parent and update locally */
2579         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2580         if (ret !=0) {
2581                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2582                 return;
2583         }
2584         DEBUGLEVEL = debug_level;
2585
2586         /* get relevant tunables */
2587         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2588         if (ret != 0) {
2589                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2590                 return;
2591         }
2592
2593         /* get runstate */
2594         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2595                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2596         if (ret != 0) {
2597                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2598                 return;
2599         }
2600
2601         pnn = ctdb_get_pnn(ctdb);
2602
2603         /* get nodemap */
2604         TALLOC_FREE(rec->nodemap);
2605         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2606         if (ret != 0) {
2607                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2608                 return;
2609         }
2610         nodemap = rec->nodemap;
2611
2612         /* remember our own node flags */
2613         rec->node_flags = nodemap->nodes[pnn].flags;
2614
2615         ban_misbehaving_nodes(rec, &self_ban);
2616         if (self_ban) {
2617                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2618                 return;
2619         }
2620
2621         ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2622                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2623         if (ret != 0) {
2624                 D_ERR("Failed to read recmode from local node\n");
2625                 return;
2626         }
2627
2628         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2629            also frozen and that the recmode is set to active.
2630         */
2631         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2632                 /* If this node has become inactive then we want to
2633                  * reduce the chances of it taking over the recovery
2634                  * master role when it becomes active again.  This
2635                  * helps to stabilise the recovery master role so that
2636                  * it stays on the most stable node.
2637                  */
2638                 rec->priority_time = timeval_current();
2639
2640                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2641                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2642
2643                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2644                         if (ret != 0) {
2645                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2646
2647                                 return;
2648                         }
2649                 }
2650                 if (! rec->frozen_on_inactive) {
2651                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2652                                                CTDB_CURRENT_NODE);
2653                         if (ret != 0) {
2654                                 DEBUG(DEBUG_ERR,
2655                                       (__location__ " Failed to freeze node "
2656                                        "in STOPPED or BANNED state\n"));
2657                                 return;
2658                         }
2659
2660                         rec->frozen_on_inactive = true;
2661                 }
2662
2663                 /* If this node is stopped or banned then it is not the recovery
2664                  * master, so don't do anything. This prevents stopped or banned
2665                  * node from starting election and sending unnecessary controls.
2666                  */
2667                 return;
2668         }
2669
2670         rec->frozen_on_inactive = false;
2671
2672         /* Retrieve capabilities from all connected nodes */
2673         ret = update_capabilities(rec, nodemap);
2674         if (ret != 0) {
2675                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2676                 return;
2677         }
2678
2679         if (! validate_recovery_master(rec, mem_ctx)) {
2680                 return;
2681         }
2682
2683         if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2684                 /* Check if an IP takeover run is needed and trigger one if
2685                  * necessary */
2686                 verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2687         }
2688
2689         /* if we are not the recmaster then we do not need to check
2690            if recovery is needed
2691          */
2692         if (pnn != rec->recmaster) {
2693                 return;
2694         }
2695
2696
2697         /* ensure our local copies of flags are right */
2698         ret = update_local_flags(rec, nodemap);
2699         if (ret != 0) {
2700                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2701                 return;
2702         }
2703
2704         if (ctdb->num_nodes != nodemap->num) {
2705                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2706                 ctdb_load_nodes_file(ctdb);
2707                 return;
2708         }
2709
2710         /* verify that all active nodes agree that we are the recmaster */
2711         switch (verify_recmaster(rec, nodemap, pnn)) {
2712         case MONITOR_RECOVERY_NEEDED:
2713                 /* can not happen */
2714                 return;
2715         case MONITOR_ELECTION_NEEDED:
2716                 force_election(rec, pnn, nodemap);
2717                 return;
2718         case MONITOR_OK:
2719                 break;
2720         case MONITOR_FAILED:
2721                 return;
2722         }
2723
2724
2725         /* get the vnnmap */
2726         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2727         if (ret != 0) {
2728                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2729                 return;
2730         }
2731
2732         if (rec->need_recovery) {
2733                 /* a previous recovery didn't finish */
2734                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2735                 return;
2736         }
2737
2738         /* verify that all active nodes are in normal mode 
2739            and not in recovery mode 
2740         */
2741         switch (verify_recmode(ctdb, nodemap)) {
2742         case MONITOR_RECOVERY_NEEDED:
2743                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2744                 return;
2745         case MONITOR_FAILED:
2746                 return;
2747         case MONITOR_ELECTION_NEEDED:
2748                 /* can not happen */
2749         case MONITOR_OK:
2750                 break;
2751         }
2752
2753
2754         if (ctdb->recovery_lock != NULL) {
2755                 /* We must already hold the recovery lock */
2756                 if (!ctdb_recovery_have_lock(rec)) {
2757                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2758                         ctdb_set_culprit(rec, ctdb->pnn);
2759                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2760                         return;
2761                 }
2762         }
2763
2764
2765         /* If recoveries are disabled then there is no use doing any
2766          * nodemap or flags checks.  Recoveries might be disabled due
2767          * to "reloadnodes", so doing these checks might cause an
2768          * unnecessary recovery.  */
2769         if (ctdb_op_is_disabled(rec->recovery)) {
2770                 goto takeover_run_checks;
2771         }
2772
2773         /* get the nodemap for all active remote nodes
2774          */
2775         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
2776         if (remote_nodemaps == NULL) {
2777                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2778                 return;
2779         }
2780         for(i=0; i<nodemap->num; i++) {
2781                 remote_nodemaps[i] = NULL;
2782         }
2783         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2784                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2785                 return;
2786         } 
2787
2788         /* verify that all other nodes have the same nodemap as we have
2789         */
2790         for (j=0; j<nodemap->num; j++) {
2791                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2792                         continue;
2793                 }
2794
2795                 if (remote_nodemaps[j] == NULL) {
2796                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2797                         ctdb_set_culprit(rec, j);
2798
2799                         return;
2800                 }
2801
2802                 /* if the nodes disagree on how many nodes there are
2803                    then this is a good reason to try recovery
2804                  */
2805                 if (remote_nodemaps[j]->num != nodemap->num) {
2806                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2807                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2808                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2809                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2810                         return;
2811                 }
2812
2813                 /* if the nodes disagree on which nodes exist and are
2814                    active, then that is also a good reason to do recovery
2815                  */
2816                 for (i=0;i<nodemap->num;i++) {
2817                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2818                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2819                                           nodemap->nodes[j].pnn, i, 
2820                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2821                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2822                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2823                                             vnnmap);
2824                                 return;
2825                         }
2826                 }
2827         }
2828
2829         /*
2830          * Update node flags obtained from each active node. This ensure we have
2831          * up-to-date information for all the nodes.
2832          */
2833         for (j=0; j<nodemap->num; j++) {
2834                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2835                         continue;
2836                 }
2837                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2838         }
2839
2840         for (j=0; j<nodemap->num; j++) {
2841                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2842                         continue;
2843                 }
2844
2845                 /* verify the flags are consistent
2846                 */
2847                 for (i=0; i<nodemap->num; i++) {
2848                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2849                                 continue;
2850                         }
2851                         
2852                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2853                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2854                                   nodemap->nodes[j].pnn, 
2855                                   nodemap->nodes[i].pnn, 
2856                                   remote_nodemaps[j]->nodes[i].flags,
2857                                   nodemap->nodes[i].flags));
2858                                 if (i == j) {
2859                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2860                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2861                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2862                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2863                                                     vnnmap);
2864                                         return;
2865                                 } else {
2866                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2867                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2868                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2869                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2870                                                     vnnmap);
2871                                         return;
2872                                 }
2873                         }
2874                 }
2875         }
2876
2877
2878         /* count how many active nodes there are */
2879         num_lmasters  = 0;
2880         for (i=0; i<nodemap->num; i++) {
2881                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2882                         if (ctdb_node_has_capabilities(rec->caps,
2883                                                        ctdb->nodes[i]->pnn,
2884                                                        CTDB_CAP_LMASTER)) {
2885                                 num_lmasters++;
2886                         }
2887                 }
2888         }
2889
2890
2891         /* There must be the same number of lmasters in the vnn map as
2892          * there are active nodes with the lmaster capability...  or
2893          * do a recovery.
2894          */
2895         if (vnnmap->size != num_lmasters) {
2896                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2897                           vnnmap->size, num_lmasters));
2898                 ctdb_set_culprit(rec, ctdb->pnn);
2899                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2900                 return;
2901         }
2902
2903         /* verify that all active nodes in the nodemap also exist in 
2904            the vnnmap.
2905          */
2906         for (j=0; j<nodemap->num; j++) {
2907                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2908                         continue;
2909                 }
2910                 if (nodemap->nodes[j].pnn == pnn) {
2911                         continue;
2912                 }
2913
2914                 for (i=0; i<vnnmap->size; i++) {
2915                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2916                                 break;
2917                         }
2918                 }
2919                 if (i == vnnmap->size) {
2920                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2921                                   nodemap->nodes[j].pnn));
2922                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2923                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2924                         return;
2925                 }
2926         }
2927
2928         
2929         /* verify that all other nodes have the same vnnmap
2930            and are from the same generation
2931          */
2932         for (j=0; j<nodemap->num; j++) {
2933                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2934                         continue;
2935                 }
2936                 if (nodemap->nodes[j].pnn == pnn) {
2937                         continue;
2938                 }
2939
2940                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2941                                           mem_ctx, &remote_vnnmap);
2942                 if (ret != 0) {
2943                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2944                                   nodemap->nodes[j].pnn));
2945                         return;
2946                 }
2947
2948                 /* verify the vnnmap generation is the same */
2949                 if (vnnmap->generation != remote_vnnmap->generation) {
2950                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2951                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2952                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2953                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2954                         return;
2955                 }
2956
2957                 /* verify the vnnmap size is the same */
2958                 if (vnnmap->size != remote_vnnmap->size) {
2959                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2960                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2961                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2962                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2963                         return;
2964                 }
2965
2966                 /* verify the vnnmap is the same */
2967                 for (i=0;i<vnnmap->size;i++) {
2968                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2969                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2970                                           nodemap->nodes[j].pnn));
2971                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2972                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2973                                             vnnmap);
2974                                 return;
2975                         }
2976                 }
2977         }
2978
2979         /* FIXME: Add remote public IP checking to ensure that nodes
2980          * have the IP addresses that are allocated to them. */
2981
2982 takeover_run_checks:
2983
2984         /* If there are IP takeover runs requested or the previous one
2985          * failed then perform one and notify the waiters */
2986         if (!ctdb_op_is_disabled(rec->takeover_run) &&
2987             (rec->reallocate_requests || rec->need_takeover_run)) {
2988                 process_ipreallocate_requests(ctdb, rec);
2989         }
2990 }
2991
2992 static void recd_sig_term_handler(struct tevent_context *ev,
2993                                   struct tevent_signal *se, int signum,
2994                                   int count, void *dont_care,
2995                                   void *private_data)
2996 {
2997         struct ctdb_recoverd *rec = talloc_get_type_abort(
2998                 private_data, struct ctdb_recoverd);
2999
3000         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
3001         ctdb_recovery_unlock(rec);
3002         exit(0);
3003 }
3004
3005
3006 /*
3007   the main monitoring loop
3008  */
3009 static void monitor_cluster(struct ctdb_context *ctdb)
3010 {
3011         struct tevent_signal *se;
3012         struct ctdb_recoverd *rec;
3013
3014         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3015
3016         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3017         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3018
3019         rec->ctdb = ctdb;
3020         rec->recmaster = CTDB_UNKNOWN_PNN;
3021         rec->recovery_lock_handle = NULL;
3022
3023         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3024         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3025
3026         rec->recovery = ctdb_op_init(rec, "recoveries");
3027         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3028
3029         rec->priority_time = timeval_current();
3030         rec->frozen_on_inactive = false;
3031
3032         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
3033                                recd_sig_term_handler, rec);
3034         if (se == NULL) {
3035                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
3036                 exit(1);
3037         }
3038
3039         /* register a message port for sending memory dumps */
3040         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3041
3042         /* when a node is assigned banning credits */
3043         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3044                                         banning_handler, rec);
3045
3046         /* register a message port for recovery elections */
3047         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3048
3049         /* when nodes are disabled/enabled */
3050         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3051
3052         /* when we are asked to puch out a flag change */
3053         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3054
3055         /* register a message port for vacuum fetch */
3056         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3057
3058         /* register a message port for reloadnodes  */
3059         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3060
3061         /* register a message port for performing a takeover run */
3062         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3063
3064         /* register a message port for disabling the ip check for a short while */
3065         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3066
3067         /* register a message port for forcing a rebalance of a node next
3068            reallocation */
3069         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3070
3071         /* Register a message port for disabling takeover runs */
3072         ctdb_client_set_message_handler(ctdb,
3073                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3074                                         disable_takeover_runs_handler, rec);
3075
3076         /* Register a message port for disabling recoveries */
3077         ctdb_client_set_message_handler(ctdb,
3078                                         CTDB_SRVID_DISABLE_RECOVERIES,
3079                                         disable_recoveries_handler, rec);
3080
3081         /* register a message port for detaching database */
3082         ctdb_client_set_message_handler(ctdb,
3083                                         CTDB_SRVID_DETACH_DATABASE,
3084                                         detach_database_handler, rec);
3085
3086         for (;;) {
3087                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3088                 struct timeval start;
3089                 double elapsed;
3090
3091                 if (!mem_ctx) {
3092                         DEBUG(DEBUG_CRIT,(__location__
3093                                           " Failed to create temp context\n"));
3094                         exit(-1);
3095                 }
3096
3097                 start = timeval_current();
3098                 main_loop(ctdb, rec, mem_ctx);
3099                 talloc_free(mem_ctx);
3100
3101                 /* we only check for recovery once every second */
3102                 elapsed = timeval_elapsed(&start);
3103                 if (elapsed < ctdb->tunable.recover_interval) {
3104                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3105                                           - elapsed);
3106                 }
3107         }
3108 }
3109
3110 /*
3111   event handler for when the main ctdbd dies
3112  */
3113 static void ctdb_recoverd_parent(struct tevent_context *ev,
3114                                  struct tevent_fd *fde,
3115                                  uint16_t flags, void *private_data)
3116 {
3117         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3118         _exit(1);
3119 }
3120
3121 /*
3122   called regularly to verify that the recovery daemon is still running
3123  */
3124 static void ctdb_check_recd(struct tevent_context *ev,
3125                             struct tevent_timer *te,
3126                             struct timeval yt, void *p)
3127 {
3128         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3129
3130         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3131                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3132
3133                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3134                                  ctdb_restart_recd, ctdb);
3135
3136                 return;
3137         }
3138
3139         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3140                          timeval_current_ofs(30, 0),
3141                          ctdb_check_recd, ctdb);
3142 }
3143
3144 static void recd_sig_child_handler(struct tevent_context *ev,
3145                                    struct tevent_signal *se, int signum,
3146                                    int count, void *dont_care,
3147                                    void *private_data)
3148 {
3149 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3150         int status;
3151         pid_t pid = -1;
3152
3153         while (pid != 0) {
3154                 pid = waitpid(-1, &status, WNOHANG);
3155                 if (pid == -1) {
3156                         if (errno != ECHILD) {
3157                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3158                         }
3159                         return;
3160                 }
3161                 if (pid > 0) {
3162                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3163                 }
3164         }
3165 }
3166
3167 /*
3168   startup the recovery daemon as a child of the main ctdb daemon
3169  */
3170 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3171 {
3172         int fd[2];
3173         struct tevent_signal *se;
3174         struct tevent_fd *fde;
3175         int ret;
3176
3177         if (pipe(fd) != 0) {
3178                 return -1;
3179         }
3180
3181         ctdb->recoverd_pid = ctdb_fork(ctdb);
3182         if (ctdb->recoverd_pid == -1) {
3183                 return -1;
3184         }
3185
3186         if (ctdb->recoverd_pid != 0) {
3187                 talloc_free(ctdb->recd_ctx);
3188                 ctdb->recd_ctx = talloc_new(ctdb);
3189                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3190
3191                 close(fd[0]);
3192                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3193                                  timeval_current_ofs(30, 0),
3194                                  ctdb_check_recd, ctdb);
3195                 return 0;
3196         }
3197
3198         close(fd[1]);
3199
3200         srandom(getpid() ^ time(NULL));
3201
3202         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3203         if (ret != 0) {
3204                 return -1;
3205         }
3206
3207         prctl_set_comment("ctdb_recovered");
3208         if (switch_from_server_to_client(ctdb) != 0) {
3209                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3210                 exit(1);
3211         }
3212
3213         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3214
3215         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3216                             ctdb_recoverd_parent, &fd[0]);
3217         tevent_fd_set_auto_close(fde);
3218
3219         /* set up a handler to pick up sigchld */
3220         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3221                                recd_sig_child_handler, ctdb);
3222         if (se == NULL) {
3223                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3224                 exit(1);
3225         }
3226
3227         monitor_cluster(ctdb);
3228
3229         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3230         return -1;
3231 }
3232
3233 /*
3234   shutdown the recovery daemon
3235  */
3236 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3237 {
3238         if (ctdb->recoverd_pid == 0) {
3239                 return;
3240         }
3241
3242         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3243         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3244
3245         TALLOC_FREE(ctdb->recd_ctx);
3246         TALLOC_FREE(ctdb->recd_ping_count);
3247 }
3248
3249 static void ctdb_restart_recd(struct tevent_context *ev,
3250                               struct tevent_timer *te,
3251                               struct timeval t, void *private_data)
3252 {
3253         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3254
3255         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3256         ctdb_stop_recoverd(ctdb);
3257         ctdb_start_recoverd(ctdb);
3258 }