a63021e4d8b69ae26575b55d2c5a77125876d19e
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system_socket.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "server/ctdb_config.h"
46
47 #include "ctdb_cluster_mutex.h"
48
49 /* List of SRVID requests that need to be processed */
50 struct srvid_list {
51         struct srvid_list *next, *prev;
52         struct ctdb_srvid_message *request;
53 };
54
55 struct srvid_requests {
56         struct srvid_list *requests;
57 };
58
59 static void srvid_request_reply(struct ctdb_context *ctdb,
60                                 struct ctdb_srvid_message *request,
61                                 TDB_DATA result)
62 {
63         /* Someone that sent srvid==0 does not want a reply */
64         if (request->srvid == 0) {
65                 talloc_free(request);
66                 return;
67         }
68
69         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
70                                      result) == 0) {
71                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
72                                   (unsigned)request->pnn,
73                                   (unsigned long long)request->srvid));
74         } else {
75                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
76                                  (unsigned)request->pnn,
77                                  (unsigned long long)request->srvid));
78         }
79
80         talloc_free(request);
81 }
82
83 static void srvid_requests_reply(struct ctdb_context *ctdb,
84                                  struct srvid_requests **requests,
85                                  TDB_DATA result)
86 {
87         struct srvid_list *r;
88
89         if (*requests == NULL) {
90                 return;
91         }
92
93         for (r = (*requests)->requests; r != NULL; r = r->next) {
94                 srvid_request_reply(ctdb, r->request, result);
95         }
96
97         /* Free the list structure... */
98         TALLOC_FREE(*requests);
99 }
100
101 static void srvid_request_add(struct ctdb_context *ctdb,
102                               struct srvid_requests **requests,
103                               struct ctdb_srvid_message *request)
104 {
105         struct srvid_list *t;
106         int32_t ret;
107         TDB_DATA result;
108
109         if (*requests == NULL) {
110                 *requests = talloc_zero(ctdb, struct srvid_requests);
111                 if (*requests == NULL) {
112                         goto nomem;
113                 }
114         }
115
116         t = talloc_zero(*requests, struct srvid_list);
117         if (t == NULL) {
118                 /* If *requests was just allocated above then free it */
119                 if ((*requests)->requests == NULL) {
120                         TALLOC_FREE(*requests);
121                 }
122                 goto nomem;
123         }
124
125         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
126         DLIST_ADD((*requests)->requests, t);
127
128         return;
129
130 nomem:
131         /* Failed to add the request to the list.  Send a fail. */
132         DEBUG(DEBUG_ERR, (__location__
133                           " Out of memory, failed to queue SRVID request\n"));
134         ret = -ENOMEM;
135         result.dsize = sizeof(ret);
136         result.dptr = (uint8_t *)&ret;
137         srvid_request_reply(ctdb, request, result);
138 }
139
140 /* An abstraction to allow an operation (takeover runs, recoveries,
141  * ...) to be disabled for a given timeout */
142 struct ctdb_op_state {
143         struct tevent_timer *timer;
144         bool in_progress;
145         const char *name;
146 };
147
148 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
149 {
150         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
151
152         if (state != NULL) {
153                 state->in_progress = false;
154                 state->name = name;
155         }
156
157         return state;
158 }
159
160 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
161 {
162         return state->timer != NULL;
163 }
164
165 static bool ctdb_op_begin(struct ctdb_op_state *state)
166 {
167         if (ctdb_op_is_disabled(state)) {
168                 DEBUG(DEBUG_NOTICE,
169                       ("Unable to begin - %s are disabled\n", state->name));
170                 return false;
171         }
172
173         state->in_progress = true;
174         return true;
175 }
176
177 static bool ctdb_op_end(struct ctdb_op_state *state)
178 {
179         return state->in_progress = false;
180 }
181
182 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
183 {
184         return state->in_progress;
185 }
186
187 static void ctdb_op_enable(struct ctdb_op_state *state)
188 {
189         TALLOC_FREE(state->timer);
190 }
191
192 static void ctdb_op_timeout_handler(struct tevent_context *ev,
193                                     struct tevent_timer *te,
194                                     struct timeval yt, void *p)
195 {
196         struct ctdb_op_state *state =
197                 talloc_get_type(p, struct ctdb_op_state);
198
199         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
200         ctdb_op_enable(state);
201 }
202
203 static int ctdb_op_disable(struct ctdb_op_state *state,
204                            struct tevent_context *ev,
205                            uint32_t timeout)
206 {
207         if (timeout == 0) {
208                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
209                 ctdb_op_enable(state);
210                 return 0;
211         }
212
213         if (state->in_progress) {
214                 DEBUG(DEBUG_ERR,
215                       ("Unable to disable %s - in progress\n", state->name));
216                 return -EAGAIN;
217         }
218
219         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
220                             state->name, timeout));
221
222         /* Clear any old timers */
223         talloc_free(state->timer);
224
225         /* Arrange for the timeout to occur */
226         state->timer = tevent_add_timer(ev, state,
227                                         timeval_current_ofs(timeout, 0),
228                                         ctdb_op_timeout_handler, state);
229         if (state->timer == NULL) {
230                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
231                 return -ENOMEM;
232         }
233
234         return 0;
235 }
236
237 struct ctdb_banning_state {
238         uint32_t count;
239         struct timeval last_reported_time;
240 };
241
242 struct ctdb_recovery_lock_handle;
243
244 /*
245   private state of recovery daemon
246  */
247 struct ctdb_recoverd {
248         struct ctdb_context *ctdb;
249         uint32_t recmaster;
250         uint32_t last_culprit_node;
251         struct ctdb_node_map_old *nodemap;
252         struct timeval priority_time;
253         bool need_takeover_run;
254         bool need_recovery;
255         uint32_t node_flags;
256         struct tevent_timer *send_election_te;
257         struct tevent_timer *election_timeout;
258         struct srvid_requests *reallocate_requests;
259         struct ctdb_op_state *takeover_run;
260         struct ctdb_op_state *recovery;
261         struct ctdb_iface_list_old *ifaces;
262         uint32_t *force_rebalance_nodes;
263         struct ctdb_node_capabilities *caps;
264         bool frozen_on_inactive;
265         struct ctdb_recovery_lock_handle *recovery_lock_handle;
266 };
267
268 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
269 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
270
271 static void ctdb_restart_recd(struct tevent_context *ev,
272                               struct tevent_timer *te, struct timeval t,
273                               void *private_data);
274
275 /*
276   ban a node for a period of time
277  */
278 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
279 {
280         int ret;
281         struct ctdb_context *ctdb = rec->ctdb;
282         struct ctdb_ban_state bantime;
283
284         if (!ctdb_validate_pnn(ctdb, pnn)) {
285                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
286                 return;
287         }
288
289         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
290
291         bantime.pnn  = pnn;
292         bantime.time = ban_time;
293
294         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
295         if (ret != 0) {
296                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
297                 return;
298         }
299
300 }
301
302 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
303
304
305 /*
306   remember the trouble maker
307  */
308 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
309 {
310         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
311         struct ctdb_banning_state *ban_state;
312
313         if (culprit > ctdb->num_nodes) {
314                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
315                 return;
316         }
317
318         /* If we are banned or stopped, do not set other nodes as culprits */
319         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
320                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
321                 return;
322         }
323
324         if (ctdb->nodes[culprit]->ban_state == NULL) {
325                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
326                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
327
328                 
329         }
330         ban_state = ctdb->nodes[culprit]->ban_state;
331         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
332                 /* this was the first time in a long while this node
333                    misbehaved so we will forgive any old transgressions.
334                 */
335                 ban_state->count = 0;
336         }
337
338         ban_state->count += count;
339         ban_state->last_reported_time = timeval_current();
340         rec->last_culprit_node = culprit;
341 }
342
343 /*
344   remember the trouble maker
345  */
346 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
347 {
348         ctdb_set_culprit_count(rec, culprit, 1);
349 }
350
351 /*
352   Retrieve capabilities from all connected nodes
353  */
354 static int update_capabilities(struct ctdb_recoverd *rec,
355                                struct ctdb_node_map_old *nodemap)
356 {
357         uint32_t *capp;
358         TALLOC_CTX *tmp_ctx;
359         struct ctdb_node_capabilities *caps;
360         struct ctdb_context *ctdb = rec->ctdb;
361
362         tmp_ctx = talloc_new(rec);
363         CTDB_NO_MEMORY(ctdb, tmp_ctx);
364
365         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
366                                      CONTROL_TIMEOUT(), nodemap);
367
368         if (caps == NULL) {
369                 DEBUG(DEBUG_ERR,
370                       (__location__ " Failed to get node capabilities\n"));
371                 talloc_free(tmp_ctx);
372                 return -1;
373         }
374
375         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
376         if (capp == NULL) {
377                 DEBUG(DEBUG_ERR,
378                       (__location__
379                        " Capabilities don't include current node.\n"));
380                 talloc_free(tmp_ctx);
381                 return -1;
382         }
383         ctdb->capabilities = *capp;
384
385         TALLOC_FREE(rec->caps);
386         rec->caps = talloc_steal(rec, caps);
387
388         talloc_free(tmp_ctx);
389         return 0;
390 }
391
392 /*
393   change recovery mode on all nodes
394  */
395 static int set_recovery_mode(struct ctdb_context *ctdb,
396                              struct ctdb_recoverd *rec,
397                              struct ctdb_node_map_old *nodemap,
398                              uint32_t rec_mode)
399 {
400         TDB_DATA data;
401         uint32_t *nodes;
402         TALLOC_CTX *tmp_ctx;
403
404         tmp_ctx = talloc_new(ctdb);
405         CTDB_NO_MEMORY(ctdb, tmp_ctx);
406
407         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
408
409         data.dsize = sizeof(uint32_t);
410         data.dptr = (unsigned char *)&rec_mode;
411
412         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
413                                         nodes, 0,
414                                         CONTROL_TIMEOUT(),
415                                         false, data,
416                                         NULL, NULL,
417                                         NULL) != 0) {
418                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
419                 talloc_free(tmp_ctx);
420                 return -1;
421         }
422
423         talloc_free(tmp_ctx);
424         return 0;
425 }
426
427 /*
428   ensure all other nodes have attached to any databases that we have
429  */
430 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
431                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
432 {
433         int i, j, db, ret;
434         struct ctdb_dbid_map_old *remote_dbmap;
435
436         /* verify that all other nodes have all our databases */
437         for (j=0; j<nodemap->num; j++) {
438                 /* we don't need to ourself ourselves */
439                 if (nodemap->nodes[j].pnn == pnn) {
440                         continue;
441                 }
442                 /* don't check nodes that are unavailable */
443                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
444                         continue;
445                 }
446
447                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
448                                          mem_ctx, &remote_dbmap);
449                 if (ret != 0) {
450                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
451                         return -1;
452                 }
453
454                 /* step through all local databases */
455                 for (db=0; db<dbmap->num;db++) {
456                         const char *name;
457
458
459                         for (i=0;i<remote_dbmap->num;i++) {
460                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
461                                         break;
462                                 }
463                         }
464                         /* the remote node already have this database */
465                         if (i!=remote_dbmap->num) {
466                                 continue;
467                         }
468                         /* ok so we need to create this database */
469                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
470                                                   dbmap->dbs[db].db_id, mem_ctx,
471                                                   &name);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
474                                 return -1;
475                         }
476                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
477                                                  nodemap->nodes[j].pnn,
478                                                  mem_ctx, name,
479                                                  dbmap->dbs[db].flags, NULL);
480                         if (ret != 0) {
481                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
482                                 return -1;
483                         }
484                 }
485         }
486
487         return 0;
488 }
489
490
491 /*
492   ensure we are attached to any databases that anyone else is attached to
493  */
494 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
495                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
496 {
497         int i, j, db, ret;
498         struct ctdb_dbid_map_old *remote_dbmap;
499
500         /* verify that we have all database any other node has */
501         for (j=0; j<nodemap->num; j++) {
502                 /* we don't need to ourself ourselves */
503                 if (nodemap->nodes[j].pnn == pnn) {
504                         continue;
505                 }
506                 /* don't check nodes that are unavailable */
507                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
508                         continue;
509                 }
510
511                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
512                                          mem_ctx, &remote_dbmap);
513                 if (ret != 0) {
514                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
515                         return -1;
516                 }
517
518                 /* step through all databases on the remote node */
519                 for (db=0; db<remote_dbmap->num;db++) {
520                         const char *name;
521
522                         for (i=0;i<(*dbmap)->num;i++) {
523                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
524                                         break;
525                                 }
526                         }
527                         /* we already have this db locally */
528                         if (i!=(*dbmap)->num) {
529                                 continue;
530                         }
531                         /* ok so we need to create this database and
532                            rebuild dbmap
533                          */
534                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
535                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
536                         if (ret != 0) {
537                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
538                                           nodemap->nodes[j].pnn));
539                                 return -1;
540                         }
541                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
542                                            mem_ctx, name,
543                                            remote_dbmap->dbs[db].flags, NULL);
544                         if (ret != 0) {
545                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
546                                 return -1;
547                         }
548                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
549                         if (ret != 0) {
550                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
551                                 return -1;
552                         }
553                 }
554         }
555
556         return 0;
557 }
558
559 /*
560   update flags on all active nodes
561  */
562 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
563 {
564         int ret;
565
566         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
567                 if (ret != 0) {
568                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
569                 return -1;
570         }
571
572         return 0;
573 }
574
575 /*
576   called when a vacuum fetch has completed - just free it and do the next one
577  */
578 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
579 {
580         talloc_free(state);
581 }
582
583
584 /**
585  * Process one elements of the vacuum fetch list:
586  * Migrate it over to us with the special flag
587  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
588  */
589 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
590                                      uint32_t pnn,
591                                      struct ctdb_rec_data_old *r)
592 {
593         struct ctdb_client_call_state *state;
594         TDB_DATA data;
595         struct ctdb_ltdb_header *hdr;
596         struct ctdb_call call;
597
598         ZERO_STRUCT(call);
599         call.call_id = CTDB_NULL_FUNC;
600         call.flags = CTDB_IMMEDIATE_MIGRATION;
601         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
602
603         call.key.dptr = &r->data[0];
604         call.key.dsize = r->keylen;
605
606         /* ensure we don't block this daemon - just skip a record if we can't get
607            the chainlock */
608         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
609                 return true;
610         }
611
612         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
613         if (data.dptr == NULL) {
614                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
615                 return true;
616         }
617
618         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
619                 free(data.dptr);
620                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
621                 return true;
622         }
623
624         hdr = (struct ctdb_ltdb_header *)data.dptr;
625         if (hdr->dmaster == pnn) {
626                 /* its already local */
627                 free(data.dptr);
628                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
629                 return true;
630         }
631
632         free(data.dptr);
633
634         state = ctdb_call_send(ctdb_db, &call);
635         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
636         if (state == NULL) {
637                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
638                 return false;
639         }
640         state->async.fn = vacuum_fetch_callback;
641         state->async.private_data = NULL;
642
643         return true;
644 }
645
646
647 /*
648   handler for vacuum fetch
649 */
650 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
651                                  void *private_data)
652 {
653         struct ctdb_recoverd *rec = talloc_get_type(
654                 private_data, struct ctdb_recoverd);
655         struct ctdb_context *ctdb = rec->ctdb;
656         struct ctdb_marshall_buffer *recs;
657         int ret, i;
658         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
659         const char *name;
660         struct ctdb_dbid_map_old *dbmap=NULL;
661         uint8_t db_flags = 0;
662         struct ctdb_db_context *ctdb_db;
663         struct ctdb_rec_data_old *r;
664
665         recs = (struct ctdb_marshall_buffer *)data.dptr;
666
667         if (recs->count == 0) {
668                 goto done;
669         }
670
671         /* work out if the database is persistent */
672         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
673         if (ret != 0) {
674                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
675                 goto done;
676         }
677
678         for (i=0;i<dbmap->num;i++) {
679                 if (dbmap->dbs[i].db_id == recs->db_id) {
680                         db_flags = dbmap->dbs[i].flags;
681                         break;
682                 }
683         }
684         if (i == dbmap->num) {
685                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
686                 goto done;
687         }
688
689         /* find the name of this database */
690         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
691                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
692                 goto done;
693         }
694
695         /* attach to it */
696         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
697         if (ctdb_db == NULL) {
698                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
699                 goto done;
700         }
701
702         r = (struct ctdb_rec_data_old *)&recs->data[0];
703         while (recs->count) {
704                 bool ok;
705
706                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
707                 if (!ok) {
708                         break;
709                 }
710
711                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
712                 recs->count--;
713         }
714
715 done:
716         talloc_free(tmp_ctx);
717 }
718
719
720 /*
721  * handler for database detach
722  */
723 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
724                                     void *private_data)
725 {
726         struct ctdb_recoverd *rec = talloc_get_type(
727                 private_data, struct ctdb_recoverd);
728         struct ctdb_context *ctdb = rec->ctdb;
729         uint32_t db_id;
730         struct ctdb_db_context *ctdb_db;
731
732         if (data.dsize != sizeof(db_id)) {
733                 return;
734         }
735         db_id = *(uint32_t *)data.dptr;
736
737         ctdb_db = find_ctdb_db(ctdb, db_id);
738         if (ctdb_db == NULL) {
739                 /* database is not attached */
740                 return;
741         }
742
743         DLIST_REMOVE(ctdb->db_list, ctdb_db);
744
745         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
746                              ctdb_db->db_name));
747         talloc_free(ctdb_db);
748 }
749
750 /*
751   called when ctdb_wait_timeout should finish
752  */
753 static void ctdb_wait_handler(struct tevent_context *ev,
754                               struct tevent_timer *te,
755                               struct timeval yt, void *p)
756 {
757         uint32_t *timed_out = (uint32_t *)p;
758         (*timed_out) = 1;
759 }
760
761 /*
762   wait for a given number of seconds
763  */
764 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
765 {
766         uint32_t timed_out = 0;
767         time_t usecs = (secs - (time_t)secs) * 1000000;
768         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
769                          ctdb_wait_handler, &timed_out);
770         while (!timed_out) {
771                 tevent_loop_once(ctdb->ev);
772         }
773 }
774
775 /*
776   called when an election times out (ends)
777  */
778 static void ctdb_election_timeout(struct tevent_context *ev,
779                                   struct tevent_timer *te,
780                                   struct timeval t, void *p)
781 {
782         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
783         rec->election_timeout = NULL;
784         fast_start = false;
785
786         DEBUG(DEBUG_WARNING,("Election period ended\n"));
787 }
788
789
790 /*
791   wait for an election to finish. It finished election_timeout seconds after
792   the last election packet is received
793  */
794 static void ctdb_wait_election(struct ctdb_recoverd *rec)
795 {
796         struct ctdb_context *ctdb = rec->ctdb;
797         while (rec->election_timeout) {
798                 tevent_loop_once(ctdb->ev);
799         }
800 }
801
802 /*
803   Update our local flags from all remote connected nodes. 
804   This is only run when we are or we belive we are the recovery master
805  */
806 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
807 {
808         int j;
809         struct ctdb_context *ctdb = rec->ctdb;
810         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
811
812         /* get the nodemap for all active remote nodes and verify
813            they are the same as for this node
814          */
815         for (j=0; j<nodemap->num; j++) {
816                 struct ctdb_node_map_old *remote_nodemap=NULL;
817                 int ret;
818
819                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
820                         continue;
821                 }
822                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
823                         continue;
824                 }
825
826                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
827                                            mem_ctx, &remote_nodemap);
828                 if (ret != 0) {
829                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
830                                   nodemap->nodes[j].pnn));
831                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
832                         talloc_free(mem_ctx);
833                         return -1;
834                 }
835                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
836                         /* We should tell our daemon about this so it
837                            updates its flags or else we will log the same 
838                            message again in the next iteration of recovery.
839                            Since we are the recovery master we can just as
840                            well update the flags on all nodes.
841                         */
842                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
843                         if (ret != 0) {
844                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
845                                 return -1;
846                         }
847
848                         /* Update our local copy of the flags in the recovery
849                            daemon.
850                         */
851                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
852                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
853                                  nodemap->nodes[j].flags));
854                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
855                 }
856                 talloc_free(remote_nodemap);
857         }
858         talloc_free(mem_ctx);
859         return 0;
860 }
861
862
863 /* Create a new random generation id.
864    The generation id can not be the INVALID_GENERATION id
865 */
866 static uint32_t new_generation(void)
867 {
868         uint32_t generation;
869
870         while (1) {
871                 generation = random();
872
873                 if (generation != INVALID_GENERATION) {
874                         break;
875                 }
876         }
877
878         return generation;
879 }
880
881 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
882 {
883         return (rec->recovery_lock_handle != NULL);
884 }
885
886 struct ctdb_recovery_lock_handle {
887         bool done;
888         bool locked;
889         double latency;
890         struct ctdb_cluster_mutex_handle *h;
891 };
892
893 static void take_reclock_handler(char status,
894                                  double latency,
895                                  void *private_data)
896 {
897         struct ctdb_recovery_lock_handle *s =
898                 (struct ctdb_recovery_lock_handle *) private_data;
899
900         s->locked = (status == '0') ;
901
902         /*
903          * If unsuccessful then ensure the process has exited and that
904          * the file descriptor event handler has been cancelled
905          */
906         if (! s->locked) {
907                 TALLOC_FREE(s->h);
908         }
909
910         switch (status) {
911         case '0':
912                 s->latency = latency;
913                 break;
914
915         case '1':
916                 DEBUG(DEBUG_ERR,
917                       ("Unable to take recovery lock - contention\n"));
918                 break;
919
920         default:
921                 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
922         }
923
924         s->done = true;
925 }
926
927 static void force_election(struct ctdb_recoverd *rec,
928                            uint32_t pnn,
929                            struct ctdb_node_map_old *nodemap);
930
931 static void lost_reclock_handler(void *private_data)
932 {
933         struct ctdb_recoverd *rec = talloc_get_type_abort(
934                 private_data, struct ctdb_recoverd);
935
936         D_ERR("Recovery lock helper terminated, triggering an election\n");
937         TALLOC_FREE(rec->recovery_lock_handle);
938
939         force_election(rec, ctdb_get_pnn(rec->ctdb), rec->nodemap);
940 }
941
942 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
943 {
944         struct ctdb_context *ctdb = rec->ctdb;
945         struct ctdb_cluster_mutex_handle *h;
946         struct ctdb_recovery_lock_handle *s;
947
948         s = talloc_zero(rec, struct ctdb_recovery_lock_handle);
949         if (s == NULL) {
950                 DBG_ERR("Memory allocation error\n");
951                 return false;
952         };
953
954         h = ctdb_cluster_mutex(s,
955                                ctdb,
956                                ctdb->recovery_lock,
957                                0,
958                                take_reclock_handler,
959                                s,
960                                lost_reclock_handler,
961                                rec);
962         if (h == NULL) {
963                 talloc_free(s);
964                 return false;
965         }
966
967         rec->recovery_lock_handle = s;
968         s->h = h;
969
970         while (! s->done) {
971                 tevent_loop_once(ctdb->ev);
972         }
973
974         if (! s->locked) {
975                 TALLOC_FREE(rec->recovery_lock_handle);
976                 return false;
977         }
978
979         ctdb_ctrl_report_recd_lock_latency(ctdb,
980                                            CONTROL_TIMEOUT(),
981                                            s->latency);
982
983         return true;
984 }
985
986 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
987 {
988         if (rec->recovery_lock_handle == NULL) {
989                 return;
990         }
991
992         if (! rec->recovery_lock_handle->done) {
993                 /*
994                  * Taking of recovery lock still in progress.  Free
995                  * the cluster mutex handle to release it but leave
996                  * the recovery lock handle in place to allow taking
997                  * of the lock to fail.
998                  */
999                 D_NOTICE("Cancelling recovery lock\n");
1000                 TALLOC_FREE(rec->recovery_lock_handle->h);
1001                 rec->recovery_lock_handle->done = true;
1002                 rec->recovery_lock_handle->locked = false;
1003                 return;
1004         }
1005
1006         D_NOTICE("Releasing recovery lock\n");
1007         TALLOC_FREE(rec->recovery_lock_handle);
1008 }
1009
1010 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1011 {
1012         struct ctdb_context *ctdb = rec->ctdb;
1013         int i;
1014         struct ctdb_banning_state *ban_state;
1015
1016         *self_ban = false;
1017         for (i=0; i<ctdb->num_nodes; i++) {
1018                 if (ctdb->nodes[i]->ban_state == NULL) {
1019                         continue;
1020                 }
1021                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1022                 if (ban_state->count < 2*ctdb->num_nodes) {
1023                         continue;
1024                 }
1025
1026                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1027                         ctdb->nodes[i]->pnn, ban_state->count,
1028                         ctdb->tunable.recovery_ban_period));
1029                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1030                 ban_state->count = 0;
1031
1032                 /* Banning ourself? */
1033                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1034                         *self_ban = true;
1035                 }
1036         }
1037 }
1038
1039 struct helper_state {
1040         int fd[2];
1041         pid_t pid;
1042         int result;
1043         bool done;
1044 };
1045
1046 static void helper_handler(struct tevent_context *ev,
1047                            struct tevent_fd *fde,
1048                            uint16_t flags, void *private_data)
1049 {
1050         struct helper_state *state = talloc_get_type_abort(
1051                 private_data, struct helper_state);
1052         int ret;
1053
1054         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1055         if (ret != sizeof(state->result)) {
1056                 state->result = EPIPE;
1057         }
1058
1059         state->done = true;
1060 }
1061
1062 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1063                       const char *prog, const char *arg, const char *type)
1064 {
1065         struct helper_state *state;
1066         struct tevent_fd *fde;
1067         const char **args;
1068         int nargs, ret;
1069         uint32_t recmaster = rec->recmaster;
1070
1071         state = talloc_zero(mem_ctx, struct helper_state);
1072         if (state == NULL) {
1073                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1074                 return -1;
1075         }
1076
1077         state->pid = -1;
1078
1079         ret = pipe(state->fd);
1080         if (ret != 0) {
1081                 DEBUG(DEBUG_ERR,
1082                       ("Failed to create pipe for %s helper\n", type));
1083                 goto fail;
1084         }
1085
1086         set_close_on_exec(state->fd[0]);
1087
1088         nargs = 4;
1089         args = talloc_array(state, const char *, nargs);
1090         if (args == NULL) {
1091                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1092                 goto fail;
1093         }
1094
1095         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1096         if (args[0] == NULL) {
1097                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1098                 goto fail;
1099         }
1100         args[1] = rec->ctdb->daemon.name;
1101         args[2] = arg;
1102         args[3] = NULL;
1103
1104         if (args[2] == NULL) {
1105                 nargs = 3;
1106         }
1107
1108         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
1109         if (state->pid == -1) {
1110                 DEBUG(DEBUG_ERR,
1111                       ("Failed to create child for %s helper\n", type));
1112                 goto fail;
1113         }
1114
1115         close(state->fd[1]);
1116         state->fd[1] = -1;
1117
1118         state->done = false;
1119
1120         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1121                             TEVENT_FD_READ, helper_handler, state);
1122         if (fde == NULL) {
1123                 goto fail;
1124         }
1125         tevent_fd_set_auto_close(fde);
1126
1127         while (!state->done) {
1128                 tevent_loop_once(rec->ctdb->ev);
1129
1130                 /* If recmaster changes, we have lost election */
1131                 if (recmaster != rec->recmaster) {
1132                         D_ERR("Recmaster changed to %u, aborting %s\n",
1133                               rec->recmaster, type);
1134                         state->result = 1;
1135                         break;
1136                 }
1137         }
1138
1139         close(state->fd[0]);
1140         state->fd[0] = -1;
1141
1142         if (state->result != 0) {
1143                 goto fail;
1144         }
1145
1146         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1147         talloc_free(state);
1148         return 0;
1149
1150 fail:
1151         if (state->fd[0] != -1) {
1152                 close(state->fd[0]);
1153         }
1154         if (state->fd[1] != -1) {
1155                 close(state->fd[1]);
1156         }
1157         if (state->pid != -1) {
1158                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1159         }
1160         talloc_free(state);
1161         return -1;
1162 }
1163
1164
1165 static int ctdb_takeover(struct ctdb_recoverd *rec,
1166                          uint32_t *force_rebalance_nodes)
1167 {
1168         static char prog[PATH_MAX+1] = "";
1169         char *arg;
1170         int i, ret;
1171
1172         if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
1173                              "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
1174                              "ctdb_takeover_helper")) {
1175                 ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
1176         }
1177
1178         arg = NULL;
1179         for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
1180                 uint32_t pnn = force_rebalance_nodes[i];
1181                 if (arg == NULL) {
1182                         arg = talloc_asprintf(rec, "%u", pnn);
1183                 } else {
1184                         arg = talloc_asprintf_append(arg, ",%u", pnn);
1185                 }
1186                 if (arg == NULL) {
1187                         DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1188                         return -1;
1189                 }
1190         }
1191
1192         if (ctdb_config.failover_disabled) {
1193                 ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
1194                 if (ret != 0) {
1195                         D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
1196                         return -1;
1197                 }
1198         }
1199
1200         return helper_run(rec, rec, prog, arg, "takeover");
1201 }
1202
1203 static bool do_takeover_run(struct ctdb_recoverd *rec,
1204                             struct ctdb_node_map_old *nodemap)
1205 {
1206         uint32_t *nodes = NULL;
1207         struct ctdb_disable_message dtr;
1208         TDB_DATA data;
1209         int i;
1210         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1211         int ret;
1212         bool ok;
1213
1214         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1215
1216         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1217                 DEBUG(DEBUG_ERR, (__location__
1218                                   " takeover run already in progress \n"));
1219                 ok = false;
1220                 goto done;
1221         }
1222
1223         if (!ctdb_op_begin(rec->takeover_run)) {
1224                 ok = false;
1225                 goto done;
1226         }
1227
1228         /* Disable IP checks (takeover runs, really) on other nodes
1229          * while doing this takeover run.  This will stop those other
1230          * nodes from triggering takeover runs when think they should
1231          * be hosting an IP but it isn't yet on an interface.  Don't
1232          * wait for replies since a failure here might cause some
1233          * noise in the logs but will not actually cause a problem.
1234          */
1235         ZERO_STRUCT(dtr);
1236         dtr.srvid = 0; /* No reply */
1237         dtr.pnn = -1;
1238
1239         data.dptr  = (uint8_t*)&dtr;
1240         data.dsize = sizeof(dtr);
1241
1242         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1243
1244         /* Disable for 60 seconds.  This can be a tunable later if
1245          * necessary.
1246          */
1247         dtr.timeout = 60;
1248         for (i = 0; i < talloc_array_length(nodes); i++) {
1249                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1250                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1251                                              data) != 0) {
1252                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1253                 }
1254         }
1255
1256         ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
1257
1258         /* Reenable takeover runs and IP checks on other nodes */
1259         dtr.timeout = 0;
1260         for (i = 0; i < talloc_array_length(nodes); i++) {
1261                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1262                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1263                                              data) != 0) {
1264                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1265                 }
1266         }
1267
1268         if (ret != 0) {
1269                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1270                 ok = false;
1271                 goto done;
1272         }
1273
1274         ok = true;
1275         /* Takeover run was successful so clear force rebalance targets */
1276         if (rebalance_nodes == rec->force_rebalance_nodes) {
1277                 TALLOC_FREE(rec->force_rebalance_nodes);
1278         } else {
1279                 DEBUG(DEBUG_WARNING,
1280                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1281         }
1282 done:
1283         rec->need_takeover_run = !ok;
1284         talloc_free(nodes);
1285         ctdb_op_end(rec->takeover_run);
1286
1287         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1288         return ok;
1289 }
1290
1291 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1292 {
1293         static char prog[PATH_MAX+1] = "";
1294         const char *arg;
1295
1296         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1297                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1298                              "ctdb_recovery_helper")) {
1299                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1300         }
1301
1302         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1303         if (arg == NULL) {
1304                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1305                 return -1;
1306         }
1307
1308         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1309
1310         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1311 }
1312
1313 /*
1314   we are the recmaster, and recovery is needed - start a recovery run
1315  */
1316 static int do_recovery(struct ctdb_recoverd *rec,
1317                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1318                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1319 {
1320         struct ctdb_context *ctdb = rec->ctdb;
1321         int i, ret;
1322         struct ctdb_dbid_map_old *dbmap;
1323         bool self_ban;
1324
1325         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1326
1327         /* Check if the current node is still the recmaster.  It's possible that
1328          * re-election has changed the recmaster.
1329          */
1330         if (pnn != rec->recmaster) {
1331                 DEBUG(DEBUG_NOTICE,
1332                       ("Recovery master changed to %u, aborting recovery\n",
1333                        rec->recmaster));
1334                 return -1;
1335         }
1336
1337         /* if recovery fails, force it again */
1338         rec->need_recovery = true;
1339
1340         if (!ctdb_op_begin(rec->recovery)) {
1341                 return -1;
1342         }
1343
1344         if (rec->election_timeout) {
1345                 /* an election is in progress */
1346                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1347                 goto fail;
1348         }
1349
1350         ban_misbehaving_nodes(rec, &self_ban);
1351         if (self_ban) {
1352                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1353                 goto fail;
1354         }
1355
1356         if (ctdb->recovery_lock != NULL) {
1357                 if (ctdb_recovery_have_lock(rec)) {
1358                         D_NOTICE("Already holding recovery lock\n");
1359                 } else {
1360                         bool ok;
1361
1362                         D_NOTICE("Attempting to take recovery lock (%s)\n",
1363                                  ctdb->recovery_lock);
1364
1365                         ok = ctdb_recovery_lock(rec);
1366                         if (! ok) {
1367                                 D_ERR("Unable to take recovery lock\n");
1368
1369                                 if (pnn != rec->recmaster) {
1370                                         D_NOTICE("Recovery master changed to %u,"
1371                                                  " aborting recovery\n",
1372                                                  rec->recmaster);
1373                                         rec->need_recovery = false;
1374                                         goto fail;
1375                                 }
1376
1377                                 if (ctdb->runstate ==
1378                                     CTDB_RUNSTATE_FIRST_RECOVERY) {
1379                                         /*
1380                                          * First recovery?  Perhaps
1381                                          * current node does not yet
1382                                          * know who the recmaster is.
1383                                          */
1384                                         D_ERR("Retrying recovery\n");
1385                                         goto fail;
1386                                 }
1387
1388                                 D_ERR("Abort recovery, "
1389                                       "ban this node for %u seconds\n",
1390                                       ctdb->tunable.recovery_ban_period);
1391                                 ctdb_ban_node(rec,
1392                                               pnn,
1393                                               ctdb->tunable.recovery_ban_period);
1394                                 goto fail;
1395                         }
1396                         D_NOTICE("Recovery lock taken successfully\n");
1397                 }
1398         }
1399
1400         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1401
1402         /* get a list of all databases */
1403         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1404         if (ret != 0) {
1405                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1406                 goto fail;
1407         }
1408
1409         /* we do the db creation before we set the recovery mode, so the freeze happens
1410            on all databases we will be dealing with. */
1411
1412         /* verify that we have all the databases any other node has */
1413         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1414         if (ret != 0) {
1415                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1416                 goto fail;
1417         }
1418
1419         /* verify that all other nodes have all our databases */
1420         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1421         if (ret != 0) {
1422                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1423                 goto fail;
1424         }
1425         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1426
1427
1428         /* Retrieve capabilities from all connected nodes */
1429         ret = update_capabilities(rec, nodemap);
1430         if (ret!=0) {
1431                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1432                 return -1;
1433         }
1434
1435         /*
1436           update all nodes to have the same flags that we have
1437          */
1438         for (i=0;i<nodemap->num;i++) {
1439                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1440                         continue;
1441                 }
1442
1443                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1444                 if (ret != 0) {
1445                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1446                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1447                         } else {
1448                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1449                                 return -1;
1450                         }
1451                 }
1452         }
1453
1454         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1455
1456         ret = db_recovery_parallel(rec, mem_ctx);
1457         if (ret != 0) {
1458                 goto fail;
1459         }
1460
1461         do_takeover_run(rec, nodemap);
1462
1463         /* send a message to all clients telling them that the cluster 
1464            has been reconfigured */
1465         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1466                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1467         if (ret != 0) {
1468                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1469                 goto fail;
1470         }
1471
1472         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1473
1474         rec->need_recovery = false;
1475         ctdb_op_end(rec->recovery);
1476
1477         /* we managed to complete a full recovery, make sure to forgive
1478            any past sins by the nodes that could now participate in the
1479            recovery.
1480         */
1481         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1482         for (i=0;i<nodemap->num;i++) {
1483                 struct ctdb_banning_state *ban_state;
1484
1485                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1486                         continue;
1487                 }
1488
1489                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1490                 if (ban_state == NULL) {
1491                         continue;
1492                 }
1493
1494                 ban_state->count = 0;
1495         }
1496
1497         /* We just finished a recovery successfully.
1498            We now wait for rerecovery_timeout before we allow
1499            another recovery to take place.
1500         */
1501         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1502         ctdb_op_disable(rec->recovery, ctdb->ev,
1503                         ctdb->tunable.rerecovery_timeout);
1504         return 0;
1505
1506 fail:
1507         ctdb_op_end(rec->recovery);
1508         return -1;
1509 }
1510
1511
1512 /*
1513   elections are won by first checking the number of connected nodes, then
1514   the priority time, then the pnn
1515  */
1516 struct election_message {
1517         uint32_t num_connected;
1518         struct timeval priority_time;
1519         uint32_t pnn;
1520         uint32_t node_flags;
1521 };
1522
1523 /*
1524   form this nodes election data
1525  */
1526 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1527 {
1528         int ret, i;
1529         struct ctdb_node_map_old *nodemap;
1530         struct ctdb_context *ctdb = rec->ctdb;
1531
1532         ZERO_STRUCTP(em);
1533
1534         em->pnn = rec->ctdb->pnn;
1535         em->priority_time = rec->priority_time;
1536
1537         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1538         if (ret != 0) {
1539                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1540                 return;
1541         }
1542
1543         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1544         em->node_flags = rec->node_flags;
1545
1546         for (i=0;i<nodemap->num;i++) {
1547                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1548                         em->num_connected++;
1549                 }
1550         }
1551
1552         /* we shouldnt try to win this election if we cant be a recmaster */
1553         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1554                 em->num_connected = 0;
1555                 em->priority_time = timeval_current();
1556         }
1557
1558         talloc_free(nodemap);
1559 }
1560
1561 /*
1562   see if the given election data wins
1563  */
1564 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1565 {
1566         struct election_message myem;
1567         int cmp = 0;
1568
1569         ctdb_election_data(rec, &myem);
1570
1571         /* we cant win if we don't have the recmaster capability */
1572         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1573                 return false;
1574         }
1575
1576         /* we cant win if we are banned */
1577         if (rec->node_flags & NODE_FLAGS_BANNED) {
1578                 return false;
1579         }
1580
1581         /* we cant win if we are stopped */
1582         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1583                 return false;
1584         }
1585
1586         /* we will automatically win if the other node is banned */
1587         if (em->node_flags & NODE_FLAGS_BANNED) {
1588                 return true;
1589         }
1590
1591         /* we will automatically win if the other node is banned */
1592         if (em->node_flags & NODE_FLAGS_STOPPED) {
1593                 return true;
1594         }
1595
1596         /* then the longest running node */
1597         if (cmp == 0) {
1598                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1599         }
1600
1601         if (cmp == 0) {
1602                 cmp = (int)myem.pnn - (int)em->pnn;
1603         }
1604
1605         return cmp > 0;
1606 }
1607
1608 /*
1609   send out an election request
1610  */
1611 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1612 {
1613         int ret;
1614         TDB_DATA election_data;
1615         struct election_message emsg;
1616         uint64_t srvid;
1617         struct ctdb_context *ctdb = rec->ctdb;
1618
1619         srvid = CTDB_SRVID_ELECTION;
1620
1621         ctdb_election_data(rec, &emsg);
1622
1623         election_data.dsize = sizeof(struct election_message);
1624         election_data.dptr  = (unsigned char *)&emsg;
1625
1626
1627         /* first we assume we will win the election and set 
1628            recoverymaster to be ourself on the current node
1629          */
1630         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1631                                      CTDB_CURRENT_NODE, pnn);
1632         if (ret != 0) {
1633                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1634                 return -1;
1635         }
1636         rec->recmaster = pnn;
1637
1638         /* send an election message to all active nodes */
1639         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1640         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1641 }
1642
1643 /*
1644   we think we are winning the election - send a broadcast election request
1645  */
1646 static void election_send_request(struct tevent_context *ev,
1647                                   struct tevent_timer *te,
1648                                   struct timeval t, void *p)
1649 {
1650         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1651         int ret;
1652
1653         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1654         if (ret != 0) {
1655                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1656         }
1657
1658         TALLOC_FREE(rec->send_election_te);
1659 }
1660
1661 /*
1662   handler for memory dumps
1663 */
1664 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1665 {
1666         struct ctdb_recoverd *rec = talloc_get_type(
1667                 private_data, struct ctdb_recoverd);
1668         struct ctdb_context *ctdb = rec->ctdb;
1669         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1670         TDB_DATA *dump;
1671         int ret;
1672         struct ctdb_srvid_message *rd;
1673
1674         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1675                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1676                 talloc_free(tmp_ctx);
1677                 return;
1678         }
1679         rd = (struct ctdb_srvid_message *)data.dptr;
1680
1681         dump = talloc_zero(tmp_ctx, TDB_DATA);
1682         if (dump == NULL) {
1683                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1684                 talloc_free(tmp_ctx);
1685                 return;
1686         }
1687         ret = ctdb_dump_memory(ctdb, dump);
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1690                 talloc_free(tmp_ctx);
1691                 return;
1692         }
1693
1694 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1695
1696         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1697         if (ret != 0) {
1698                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1699                 talloc_free(tmp_ctx);
1700                 return;
1701         }
1702
1703         talloc_free(tmp_ctx);
1704 }
1705
1706 /*
1707   handler for reload_nodes
1708 */
1709 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1710                                  void *private_data)
1711 {
1712         struct ctdb_recoverd *rec = talloc_get_type(
1713                 private_data, struct ctdb_recoverd);
1714
1715         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1716
1717         ctdb_load_nodes_file(rec->ctdb);
1718 }
1719
1720
1721 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1722                                         void *private_data)
1723 {
1724         struct ctdb_recoverd *rec = talloc_get_type(
1725                 private_data, struct ctdb_recoverd);
1726         struct ctdb_context *ctdb = rec->ctdb;
1727         uint32_t pnn;
1728         uint32_t *t;
1729         int len;
1730
1731         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1732                 return;
1733         }
1734
1735         if (data.dsize != sizeof(uint32_t)) {
1736                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1737                 return;
1738         }
1739
1740         pnn = *(uint32_t *)&data.dptr[0];
1741
1742         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1743
1744         /* Copy any existing list of nodes.  There's probably some
1745          * sort of realloc variant that will do this but we need to
1746          * make sure that freeing the old array also cancels the timer
1747          * event for the timeout... not sure if realloc will do that.
1748          */
1749         len = (rec->force_rebalance_nodes != NULL) ?
1750                 talloc_array_length(rec->force_rebalance_nodes) :
1751                 0;
1752
1753         /* This allows duplicates to be added but they don't cause
1754          * harm.  A call to add a duplicate PNN arguably means that
1755          * the timeout should be reset, so this is the simplest
1756          * solution.
1757          */
1758         t = talloc_zero_array(rec, uint32_t, len+1);
1759         CTDB_NO_MEMORY_VOID(ctdb, t);
1760         if (len > 0) {
1761                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1762         }
1763         t[len] = pnn;
1764
1765         talloc_free(rec->force_rebalance_nodes);
1766
1767         rec->force_rebalance_nodes = t;
1768 }
1769
1770
1771
1772 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1773                                     TDB_DATA data,
1774                                     struct ctdb_op_state *op_state)
1775 {
1776         struct ctdb_disable_message *r;
1777         uint32_t timeout;
1778         TDB_DATA result;
1779         int32_t ret = 0;
1780
1781         /* Validate input data */
1782         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1783                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1784                                  "expecting %lu\n", (long unsigned)data.dsize,
1785                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1786                 return;
1787         }
1788         if (data.dptr == NULL) {
1789                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1790                 return;
1791         }
1792
1793         r = (struct ctdb_disable_message *)data.dptr;
1794         timeout = r->timeout;
1795
1796         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1797         if (ret != 0) {
1798                 goto done;
1799         }
1800
1801         /* Returning our PNN tells the caller that we succeeded */
1802         ret = ctdb_get_pnn(ctdb);
1803 done:
1804         result.dsize = sizeof(int32_t);
1805         result.dptr  = (uint8_t *)&ret;
1806         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1807 }
1808
1809 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1810                                           void *private_data)
1811 {
1812         struct ctdb_recoverd *rec = talloc_get_type(
1813                 private_data, struct ctdb_recoverd);
1814
1815         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1816 }
1817
1818 /* Backward compatibility for this SRVID */
1819 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1820                                      void *private_data)
1821 {
1822         struct ctdb_recoverd *rec = talloc_get_type(
1823                 private_data, struct ctdb_recoverd);
1824         uint32_t timeout;
1825
1826         if (data.dsize != sizeof(uint32_t)) {
1827                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1828                                  "expecting %lu\n", (long unsigned)data.dsize,
1829                                  (long unsigned)sizeof(uint32_t)));
1830                 return;
1831         }
1832         if (data.dptr == NULL) {
1833                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1834                 return;
1835         }
1836
1837         timeout = *((uint32_t *)data.dptr);
1838
1839         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1840 }
1841
1842 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1843                                        void *private_data)
1844 {
1845         struct ctdb_recoverd *rec = talloc_get_type(
1846                 private_data, struct ctdb_recoverd);
1847
1848         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1849 }
1850
1851 /*
1852   handler for ip reallocate, just add it to the list of requests and 
1853   handle this later in the monitor_cluster loop so we do not recurse
1854   with other requests to takeover_run()
1855 */
1856 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1857                                   void *private_data)
1858 {
1859         struct ctdb_srvid_message *request;
1860         struct ctdb_recoverd *rec = talloc_get_type(
1861                 private_data, struct ctdb_recoverd);
1862
1863         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1864                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1865                 return;
1866         }
1867
1868         request = (struct ctdb_srvid_message *)data.dptr;
1869
1870         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1871 }
1872
1873 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1874                                           struct ctdb_recoverd *rec)
1875 {
1876         TDB_DATA result;
1877         int32_t ret;
1878         struct srvid_requests *current;
1879
1880         /* Only process requests that are currently pending.  More
1881          * might come in while the takeover run is in progress and
1882          * they will need to be processed later since they might
1883          * be in response flag changes.
1884          */
1885         current = rec->reallocate_requests;
1886         rec->reallocate_requests = NULL;
1887
1888         if (do_takeover_run(rec, rec->nodemap)) {
1889                 ret = ctdb_get_pnn(ctdb);
1890         } else {
1891                 ret = -1;
1892         }
1893
1894         result.dsize = sizeof(int32_t);
1895         result.dptr  = (uint8_t *)&ret;
1896
1897         srvid_requests_reply(ctdb, &current, result);
1898 }
1899
1900 /*
1901  * handler for assigning banning credits
1902  */
1903 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1904 {
1905         struct ctdb_recoverd *rec = talloc_get_type(
1906                 private_data, struct ctdb_recoverd);
1907         uint32_t ban_pnn;
1908
1909         /* Ignore if we are not recmaster */
1910         if (rec->ctdb->pnn != rec->recmaster) {
1911                 return;
1912         }
1913
1914         if (data.dsize != sizeof(uint32_t)) {
1915                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1916                                   data.dsize));
1917                 return;
1918         }
1919
1920         ban_pnn = *(uint32_t *)data.dptr;
1921
1922         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1923 }
1924
1925 /*
1926   handler for recovery master elections
1927 */
1928 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1929 {
1930         struct ctdb_recoverd *rec = talloc_get_type(
1931                 private_data, struct ctdb_recoverd);
1932         struct ctdb_context *ctdb = rec->ctdb;
1933         int ret;
1934         struct election_message *em = (struct election_message *)data.dptr;
1935
1936         /* Ignore election packets from ourself */
1937         if (ctdb->pnn == em->pnn) {
1938                 return;
1939         }
1940
1941         /* we got an election packet - update the timeout for the election */
1942         talloc_free(rec->election_timeout);
1943         rec->election_timeout = tevent_add_timer(
1944                         ctdb->ev, ctdb,
1945                         fast_start ?
1946                                 timeval_current_ofs(0, 500000) :
1947                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1948                         ctdb_election_timeout, rec);
1949
1950         /* someone called an election. check their election data
1951            and if we disagree and we would rather be the elected node, 
1952            send a new election message to all other nodes
1953          */
1954         if (ctdb_election_win(rec, em)) {
1955                 if (!rec->send_election_te) {
1956                         rec->send_election_te = tevent_add_timer(
1957                                         ctdb->ev, rec,
1958                                         timeval_current_ofs(0, 500000),
1959                                         election_send_request, rec);
1960                 }
1961                 return;
1962         }
1963
1964         /* we didn't win */
1965         TALLOC_FREE(rec->send_election_te);
1966
1967         /* Release the recovery lock file */
1968         if (ctdb_recovery_have_lock(rec)) {
1969                 ctdb_recovery_unlock(rec);
1970         }
1971
1972         /* ok, let that guy become recmaster then */
1973         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1974                                      CTDB_CURRENT_NODE, em->pnn);
1975         if (ret != 0) {
1976                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1977                 return;
1978         }
1979         rec->recmaster = em->pnn;
1980
1981         return;
1982 }
1983
1984
1985 /*
1986   force the start of the election process
1987  */
1988 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1989                            struct ctdb_node_map_old *nodemap)
1990 {
1991         int ret;
1992         struct ctdb_context *ctdb = rec->ctdb;
1993
1994         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1995
1996         /* set all nodes to recovery mode to stop all internode traffic */
1997         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1998         if (ret != 0) {
1999                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2000                 return;
2001         }
2002
2003         talloc_free(rec->election_timeout);
2004         rec->election_timeout = tevent_add_timer(
2005                         ctdb->ev, ctdb,
2006                         fast_start ?
2007                                 timeval_current_ofs(0, 500000) :
2008                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2009                         ctdb_election_timeout, rec);
2010
2011         ret = send_election_request(rec, pnn);
2012         if (ret!=0) {
2013                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2014                 return;
2015         }
2016
2017         /* wait for a few seconds to collect all responses */
2018         ctdb_wait_election(rec);
2019 }
2020
2021
2022
2023 /*
2024   handler for when a node changes its flags
2025 */
2026 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2027 {
2028         struct ctdb_recoverd *rec = talloc_get_type(
2029                 private_data, struct ctdb_recoverd);
2030         struct ctdb_context *ctdb = rec->ctdb;
2031         int ret;
2032         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2033         struct ctdb_node_map_old *nodemap=NULL;
2034         TALLOC_CTX *tmp_ctx;
2035         int i;
2036
2037         if (data.dsize != sizeof(*c)) {
2038                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2039                 return;
2040         }
2041
2042         tmp_ctx = talloc_new(ctdb);
2043         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2044
2045         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2048                 talloc_free(tmp_ctx);
2049                 return;         
2050         }
2051
2052
2053         for (i=0;i<nodemap->num;i++) {
2054                 if (nodemap->nodes[i].pnn == c->pnn) break;
2055         }
2056
2057         if (i == nodemap->num) {
2058                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2059                 talloc_free(tmp_ctx);
2060                 return;
2061         }
2062
2063         if (c->old_flags != c->new_flags) {
2064                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2065         }
2066
2067         nodemap->nodes[i].flags = c->new_flags;
2068
2069         talloc_free(tmp_ctx);
2070 }
2071
2072 /*
2073   handler for when we need to push out flag changes ot all other nodes
2074 */
2075 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2076                                void *private_data)
2077 {
2078         struct ctdb_recoverd *rec = talloc_get_type(
2079                 private_data, struct ctdb_recoverd);
2080         struct ctdb_context *ctdb = rec->ctdb;
2081         int ret;
2082         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2083         struct ctdb_node_map_old *nodemap=NULL;
2084         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2085         uint32_t *nodes;
2086
2087         /* read the node flags from the recmaster */
2088         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2089                                    tmp_ctx, &nodemap);
2090         if (ret != 0) {
2091                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2092                 talloc_free(tmp_ctx);
2093                 return;
2094         }
2095         if (c->pnn >= nodemap->num) {
2096                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2097                 talloc_free(tmp_ctx);
2098                 return;
2099         }
2100
2101         /* send the flags update to all connected nodes */
2102         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2103
2104         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2105                                       nodes, 0, CONTROL_TIMEOUT(),
2106                                       false, data,
2107                                       NULL, NULL,
2108                                       NULL) != 0) {
2109                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2110
2111                 talloc_free(tmp_ctx);
2112                 return;
2113         }
2114
2115         talloc_free(tmp_ctx);
2116 }
2117
2118
2119 struct verify_recmode_normal_data {
2120         uint32_t count;
2121         enum monitor_result status;
2122 };
2123
2124 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2125 {
2126         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2127
2128
2129         /* one more node has responded with recmode data*/
2130         rmdata->count--;
2131
2132         /* if we failed to get the recmode, then return an error and let
2133            the main loop try again.
2134         */
2135         if (state->state != CTDB_CONTROL_DONE) {
2136                 if (rmdata->status == MONITOR_OK) {
2137                         rmdata->status = MONITOR_FAILED;
2138                 }
2139                 return;
2140         }
2141
2142         /* if we got a response, then the recmode will be stored in the
2143            status field
2144         */
2145         if (state->status != CTDB_RECOVERY_NORMAL) {
2146                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2147                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2148         }
2149
2150         return;
2151 }
2152
2153
2154 /* verify that all nodes are in normal recovery mode */
2155 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2156 {
2157         struct verify_recmode_normal_data *rmdata;
2158         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2159         struct ctdb_client_control_state *state;
2160         enum monitor_result status;
2161         int j;
2162         
2163         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2164         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2165         rmdata->count  = 0;
2166         rmdata->status = MONITOR_OK;
2167
2168         /* loop over all active nodes and send an async getrecmode call to 
2169            them*/
2170         for (j=0; j<nodemap->num; j++) {
2171                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2172                         continue;
2173                 }
2174                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2175                                         CONTROL_TIMEOUT(), 
2176                                         nodemap->nodes[j].pnn);
2177                 if (state == NULL) {
2178                         /* we failed to send the control, treat this as 
2179                            an error and try again next iteration
2180                         */                      
2181                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2182                         talloc_free(mem_ctx);
2183                         return MONITOR_FAILED;
2184                 }
2185
2186                 /* set up the callback functions */
2187                 state->async.fn = verify_recmode_normal_callback;
2188                 state->async.private_data = rmdata;
2189
2190                 /* one more control to wait for to complete */
2191                 rmdata->count++;
2192         }
2193
2194
2195         /* now wait for up to the maximum number of seconds allowed
2196            or until all nodes we expect a response from has replied
2197         */
2198         while (rmdata->count > 0) {
2199                 tevent_loop_once(ctdb->ev);
2200         }
2201
2202         status = rmdata->status;
2203         talloc_free(mem_ctx);
2204         return status;
2205 }
2206
2207
2208 struct verify_recmaster_data {
2209         struct ctdb_recoverd *rec;
2210         uint32_t count;
2211         uint32_t pnn;
2212         enum monitor_result status;
2213 };
2214
2215 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2216 {
2217         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2218
2219
2220         /* one more node has responded with recmaster data*/
2221         rmdata->count--;
2222
2223         /* if we failed to get the recmaster, then return an error and let
2224            the main loop try again.
2225         */
2226         if (state->state != CTDB_CONTROL_DONE) {
2227                 if (rmdata->status == MONITOR_OK) {
2228                         rmdata->status = MONITOR_FAILED;
2229                 }
2230                 return;
2231         }
2232
2233         /* if we got a response, then the recmaster will be stored in the
2234            status field
2235         */
2236         if (state->status != rmdata->pnn) {
2237                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2238                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2239                 rmdata->status = MONITOR_ELECTION_NEEDED;
2240         }
2241
2242         return;
2243 }
2244
2245
2246 /* verify that all nodes agree that we are the recmaster */
2247 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
2248 {
2249         struct ctdb_context *ctdb = rec->ctdb;
2250         struct verify_recmaster_data *rmdata;
2251         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2252         struct ctdb_client_control_state *state;
2253         enum monitor_result status;
2254         int j;
2255         
2256         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2257         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2258         rmdata->rec    = rec;
2259         rmdata->count  = 0;
2260         rmdata->pnn    = pnn;
2261         rmdata->status = MONITOR_OK;
2262
2263         /* loop over all active nodes and send an async getrecmaster call to
2264            them*/
2265         for (j=0; j<nodemap->num; j++) {
2266                 if (nodemap->nodes[j].pnn == rec->recmaster) {
2267                         continue;
2268                 }
2269                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2270                         continue;
2271                 }
2272                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2273                                         CONTROL_TIMEOUT(),
2274                                         nodemap->nodes[j].pnn);
2275                 if (state == NULL) {
2276                         /* we failed to send the control, treat this as 
2277                            an error and try again next iteration
2278                         */                      
2279                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2280                         talloc_free(mem_ctx);
2281                         return MONITOR_FAILED;
2282                 }
2283
2284                 /* set up the callback functions */
2285                 state->async.fn = verify_recmaster_callback;
2286                 state->async.private_data = rmdata;
2287
2288                 /* one more control to wait for to complete */
2289                 rmdata->count++;
2290         }
2291
2292
2293         /* now wait for up to the maximum number of seconds allowed
2294            or until all nodes we expect a response from has replied
2295         */
2296         while (rmdata->count > 0) {
2297                 tevent_loop_once(ctdb->ev);
2298         }
2299
2300         status = rmdata->status;
2301         talloc_free(mem_ctx);
2302         return status;
2303 }
2304
2305 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2306                                     struct ctdb_recoverd *rec)
2307 {
2308         struct ctdb_iface_list_old *ifaces = NULL;
2309         TALLOC_CTX *mem_ctx;
2310         bool ret = false;
2311
2312         mem_ctx = talloc_new(NULL);
2313
2314         /* Read the interfaces from the local node */
2315         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2316                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2317                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2318                 /* We could return an error.  However, this will be
2319                  * rare so we'll decide that the interfaces have
2320                  * actually changed, just in case.
2321                  */
2322                 talloc_free(mem_ctx);
2323                 return true;
2324         }
2325
2326         if (!rec->ifaces) {
2327                 /* We haven't been here before so things have changed */
2328                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2329                 ret = true;
2330         } else if (rec->ifaces->num != ifaces->num) {
2331                 /* Number of interfaces has changed */
2332                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2333                                      rec->ifaces->num, ifaces->num));
2334                 ret = true;
2335         } else {
2336                 /* See if interface names or link states have changed */
2337                 int i;
2338                 for (i = 0; i < rec->ifaces->num; i++) {
2339                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2340                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2341                                 DEBUG(DEBUG_NOTICE,
2342                                       ("Interface in slot %d changed: %s => %s\n",
2343                                        i, iface->name, ifaces->ifaces[i].name));
2344                                 ret = true;
2345                                 break;
2346                         }
2347                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2348                                 DEBUG(DEBUG_NOTICE,
2349                                       ("Interface %s changed state: %d => %d\n",
2350                                        iface->name, iface->link_state,
2351                                        ifaces->ifaces[i].link_state));
2352                                 ret = true;
2353                                 break;
2354                         }
2355                 }
2356         }
2357
2358         talloc_free(rec->ifaces);
2359         rec->ifaces = talloc_steal(rec, ifaces);
2360
2361         talloc_free(mem_ctx);
2362         return ret;
2363 }
2364
2365 /* Check that the local allocation of public IP addresses is correct
2366  * and do some house-keeping */
2367 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2368                                       struct ctdb_recoverd *rec,
2369                                       uint32_t pnn,
2370                                       struct ctdb_node_map_old *nodemap)
2371 {
2372         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2373         int ret, j;
2374         bool need_takeover_run = false;
2375         struct ctdb_public_ip_list_old *ips = NULL;
2376
2377         /* If we are not the recmaster then do some housekeeping */
2378         if (rec->recmaster != pnn) {
2379                 /* Ignore any IP reallocate requests - only recmaster
2380                  * processes them
2381                  */
2382                 TALLOC_FREE(rec->reallocate_requests);
2383                 /* Clear any nodes that should be force rebalanced in
2384                  * the next takeover run.  If the recovery master role
2385                  * has moved then we don't want to process these some
2386                  * time in the future.
2387                  */
2388                 TALLOC_FREE(rec->force_rebalance_nodes);
2389         }
2390
2391         /* Return early if disabled... */
2392         if (ctdb_config.failover_disabled ||
2393             ctdb_op_is_disabled(rec->takeover_run)) {
2394                 return  0;
2395         }
2396
2397         if (interfaces_have_changed(ctdb, rec)) {
2398                 need_takeover_run = true;
2399         }
2400
2401         /* If there are unhosted IPs but this node can host them then
2402          * trigger an IP reallocation */
2403
2404         /* Read *available* IPs from local node */
2405         ret = ctdb_ctrl_get_public_ips_flags(
2406                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2407                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2408         if (ret != 0) {
2409                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2410                 talloc_free(mem_ctx);
2411                 return -1;
2412         }
2413
2414         for (j=0; j<ips->num; j++) {
2415                 if (ips->ips[j].pnn == -1 &&
2416                     nodemap->nodes[pnn].flags == 0) {
2417                         DEBUG(DEBUG_WARNING,
2418                               ("Unassigned IP %s can be served by this node\n",
2419                                ctdb_addr_to_str(&ips->ips[j].addr)));
2420                         need_takeover_run = true;
2421                 }
2422         }
2423
2424         talloc_free(ips);
2425
2426         if (!ctdb->do_checkpublicip) {
2427                 goto done;
2428         }
2429
2430         /* Validate the IP addresses that this node has on network
2431          * interfaces.  If there is an inconsistency between reality
2432          * and the state expected by CTDB then try to fix it by
2433          * triggering an IP reallocation or releasing extraneous IP
2434          * addresses. */
2435
2436         /* Read *known* IPs from local node */
2437         ret = ctdb_ctrl_get_public_ips_flags(
2438                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2439         if (ret != 0) {
2440                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2441                 talloc_free(mem_ctx);
2442                 return -1;
2443         }
2444
2445         for (j=0; j<ips->num; j++) {
2446                 if (ips->ips[j].pnn == pnn) {
2447                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2448                                 DEBUG(DEBUG_ERR,
2449                                       ("Assigned IP %s not on an interface\n",
2450                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2451                                 need_takeover_run = true;
2452                         }
2453                 } else {
2454                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2455                                 DEBUG(DEBUG_ERR,
2456                                       ("IP %s incorrectly on an interface\n",
2457                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2458                                 need_takeover_run = true;
2459                         }
2460                 }
2461         }
2462
2463 done:
2464         if (need_takeover_run) {
2465                 struct ctdb_srvid_message rd;
2466                 TDB_DATA data;
2467
2468                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2469
2470                 ZERO_STRUCT(rd);
2471                 rd.pnn = ctdb->pnn;
2472                 rd.srvid = 0;
2473                 data.dptr = (uint8_t *)&rd;
2474                 data.dsize = sizeof(rd);
2475
2476                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2477                 if (ret != 0) {
2478                         DEBUG(DEBUG_ERR,
2479                               ("Failed to send takeover run request\n"));
2480                 }
2481         }
2482         talloc_free(mem_ctx);
2483         return 0;
2484 }
2485
2486
2487 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2488 {
2489         struct ctdb_node_map_old **remote_nodemaps = callback_data;
2490
2491         if (node_pnn >= ctdb->num_nodes) {
2492                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2493                 return;
2494         }
2495
2496         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
2497
2498 }
2499
2500 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2501         struct ctdb_node_map_old *nodemap,
2502         struct ctdb_node_map_old **remote_nodemaps)
2503 {
2504         uint32_t *nodes;
2505
2506         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2507         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2508                                         nodes, 0,
2509                                         CONTROL_TIMEOUT(), false, tdb_null,
2510                                         async_getnodemap_callback,
2511                                         NULL,
2512                                         remote_nodemaps) != 0) {
2513                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2514
2515                 return -1;
2516         }
2517
2518         return 0;
2519 }
2520
2521 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2522                                      TALLOC_CTX *mem_ctx)
2523 {
2524         struct ctdb_context *ctdb = rec->ctdb;
2525         uint32_t pnn = ctdb_get_pnn(ctdb);
2526         struct ctdb_node_map_old *nodemap = rec->nodemap;
2527         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2528         int ret;
2529
2530         /* When recovery daemon is started, recmaster is set to
2531          * "unknown" so it knows to start an election.
2532          */
2533         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2534                 DEBUG(DEBUG_NOTICE,
2535                       ("Initial recovery master set - forcing election\n"));
2536                 force_election(rec, pnn, nodemap);
2537                 return false;
2538         }
2539
2540         /*
2541          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2542          * but we have, then force an election and try to become the new
2543          * recmaster.
2544          */
2545         if (!ctdb_node_has_capabilities(rec->caps,
2546                                         rec->recmaster,
2547                                         CTDB_CAP_RECMASTER) &&
2548             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2549             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2550                 DEBUG(DEBUG_ERR,
2551                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2552                        " but we (node %u) have - force an election\n",
2553                        rec->recmaster, pnn));
2554                 force_election(rec, pnn, nodemap);
2555                 return false;
2556         }
2557
2558         /* Verify that the master node has not been deleted.  This
2559          * should not happen because a node should always be shutdown
2560          * before being deleted, causing a new master to be elected
2561          * before now.  However, if something strange has happened
2562          * then checking here will ensure we don't index beyond the
2563          * end of the nodemap array. */
2564         if (rec->recmaster >= nodemap->num) {
2565                 DEBUG(DEBUG_ERR,
2566                       ("Recmaster node %u has been deleted. Force election\n",
2567                        rec->recmaster));
2568                 force_election(rec, pnn, nodemap);
2569                 return false;
2570         }
2571
2572         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2573         if (nodemap->nodes[rec->recmaster].flags &
2574             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2575                 DEBUG(DEBUG_NOTICE,
2576                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2577                        rec->recmaster));
2578                 force_election(rec, pnn, nodemap);
2579                 return false;
2580         }
2581
2582         /* get nodemap from the recovery master to check if it is inactive */
2583         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2584                                    mem_ctx, &recmaster_nodemap);
2585         if (ret != 0) {
2586                 DEBUG(DEBUG_ERR,
2587                       (__location__
2588                        " Unable to get nodemap from recovery master %u\n",
2589                           rec->recmaster));
2590                 /* No election, just error */
2591                 return false;
2592         }
2593
2594
2595         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2596             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2597                 DEBUG(DEBUG_NOTICE,
2598                       ("Recmaster node %u is inactive. Force election\n",
2599                        rec->recmaster));
2600                 /*
2601                  * update our nodemap to carry the recmaster's notion of
2602                  * its own flags, so that we don't keep freezing the
2603                  * inactive recmaster node...
2604                  */
2605                 nodemap->nodes[rec->recmaster].flags =
2606                         recmaster_nodemap->nodes[rec->recmaster].flags;
2607                 force_election(rec, pnn, nodemap);
2608                 return false;
2609         }
2610
2611         return true;
2612 }
2613
2614 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2615                       TALLOC_CTX *mem_ctx)
2616 {
2617         uint32_t pnn;
2618         struct ctdb_node_map_old *nodemap=NULL;
2619         struct ctdb_node_map_old **remote_nodemaps=NULL;
2620         struct ctdb_vnn_map *vnnmap=NULL;
2621         struct ctdb_vnn_map *remote_vnnmap=NULL;
2622         uint32_t num_lmasters;
2623         int32_t debug_level;
2624         int i, j, ret;
2625         bool self_ban;
2626
2627
2628         /* verify that the main daemon is still running */
2629         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2630                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2631                 exit(-1);
2632         }
2633
2634         /* ping the local daemon to tell it we are alive */
2635         ctdb_ctrl_recd_ping(ctdb);
2636
2637         if (rec->election_timeout) {
2638                 /* an election is in progress */
2639                 return;
2640         }
2641
2642         /* read the debug level from the parent and update locally */
2643         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2644         if (ret !=0) {
2645                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2646                 return;
2647         }
2648         debuglevel_set(debug_level);
2649
2650         /* get relevant tunables */
2651         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2652         if (ret != 0) {
2653                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2654                 return;
2655         }
2656
2657         /* get runstate */
2658         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2659                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2660         if (ret != 0) {
2661                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2662                 return;
2663         }
2664
2665         pnn = ctdb_get_pnn(ctdb);
2666
2667         /* get nodemap */
2668         TALLOC_FREE(rec->nodemap);
2669         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2670         if (ret != 0) {
2671                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2672                 return;
2673         }
2674         nodemap = rec->nodemap;
2675
2676         /* remember our own node flags */
2677         rec->node_flags = nodemap->nodes[pnn].flags;
2678
2679         ban_misbehaving_nodes(rec, &self_ban);
2680         if (self_ban) {
2681                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2682                 return;
2683         }
2684
2685         ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2686                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2687         if (ret != 0) {
2688                 D_ERR("Failed to read recmode from local node\n");
2689                 return;
2690         }
2691
2692         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2693            also frozen and that the recmode is set to active.
2694         */
2695         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2696                 /* If this node has become inactive then we want to
2697                  * reduce the chances of it taking over the recovery
2698                  * master role when it becomes active again.  This
2699                  * helps to stabilise the recovery master role so that
2700                  * it stays on the most stable node.
2701                  */
2702                 rec->priority_time = timeval_current();
2703
2704                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2705                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2706
2707                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2708                         if (ret != 0) {
2709                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2710
2711                                 return;
2712                         }
2713                 }
2714                 if (! rec->frozen_on_inactive) {
2715                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2716                                                CTDB_CURRENT_NODE);
2717                         if (ret != 0) {
2718                                 DEBUG(DEBUG_ERR,
2719                                       (__location__ " Failed to freeze node "
2720                                        "in STOPPED or BANNED state\n"));
2721                                 return;
2722                         }
2723
2724                         rec->frozen_on_inactive = true;
2725                 }
2726
2727                 /* If this node is stopped or banned then it is not the recovery
2728                  * master, so don't do anything. This prevents stopped or banned
2729                  * node from starting election and sending unnecessary controls.
2730                  */
2731                 return;
2732         }
2733
2734         rec->frozen_on_inactive = false;
2735
2736         /* Retrieve capabilities from all connected nodes */
2737         ret = update_capabilities(rec, nodemap);
2738         if (ret != 0) {
2739                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2740                 return;
2741         }
2742
2743         if (! validate_recovery_master(rec, mem_ctx)) {
2744                 return;
2745         }
2746
2747         if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2748                 /* Check if an IP takeover run is needed and trigger one if
2749                  * necessary */
2750                 verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2751         }
2752
2753         /* if we are not the recmaster then we do not need to check
2754            if recovery is needed
2755          */
2756         if (pnn != rec->recmaster) {
2757                 return;
2758         }
2759
2760
2761         /* ensure our local copies of flags are right */
2762         ret = update_local_flags(rec, nodemap);
2763         if (ret != 0) {
2764                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2765                 return;
2766         }
2767
2768         if (ctdb->num_nodes != nodemap->num) {
2769                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2770                 ctdb_load_nodes_file(ctdb);
2771                 return;
2772         }
2773
2774         /* verify that all active nodes agree that we are the recmaster */
2775         switch (verify_recmaster(rec, nodemap, pnn)) {
2776         case MONITOR_RECOVERY_NEEDED:
2777                 /* can not happen */
2778                 return;
2779         case MONITOR_ELECTION_NEEDED:
2780                 force_election(rec, pnn, nodemap);
2781                 return;
2782         case MONITOR_OK:
2783                 break;
2784         case MONITOR_FAILED:
2785                 return;
2786         }
2787
2788
2789         /* get the vnnmap */
2790         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2791         if (ret != 0) {
2792                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2793                 return;
2794         }
2795
2796         if (rec->need_recovery) {
2797                 /* a previous recovery didn't finish */
2798                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2799                 return;
2800         }
2801
2802         /* verify that all active nodes are in normal mode 
2803            and not in recovery mode 
2804         */
2805         switch (verify_recmode(ctdb, nodemap)) {
2806         case MONITOR_RECOVERY_NEEDED:
2807                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2808                 return;
2809         case MONITOR_FAILED:
2810                 return;
2811         case MONITOR_ELECTION_NEEDED:
2812                 /* can not happen */
2813         case MONITOR_OK:
2814                 break;
2815         }
2816
2817
2818         if (ctdb->recovery_lock != NULL) {
2819                 /* We must already hold the recovery lock */
2820                 if (!ctdb_recovery_have_lock(rec)) {
2821                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2822                         ctdb_set_culprit(rec, ctdb->pnn);
2823                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2824                         return;
2825                 }
2826         }
2827
2828
2829         /* If recoveries are disabled then there is no use doing any
2830          * nodemap or flags checks.  Recoveries might be disabled due
2831          * to "reloadnodes", so doing these checks might cause an
2832          * unnecessary recovery.  */
2833         if (ctdb_op_is_disabled(rec->recovery)) {
2834                 goto takeover_run_checks;
2835         }
2836
2837         /* get the nodemap for all active remote nodes
2838          */
2839         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
2840         if (remote_nodemaps == NULL) {
2841                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2842                 return;
2843         }
2844         for(i=0; i<nodemap->num; i++) {
2845                 remote_nodemaps[i] = NULL;
2846         }
2847         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2848                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2849                 return;
2850         } 
2851
2852         /* verify that all other nodes have the same nodemap as we have
2853         */
2854         for (j=0; j<nodemap->num; j++) {
2855                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2856                         continue;
2857                 }
2858
2859                 if (remote_nodemaps[j] == NULL) {
2860                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2861                         ctdb_set_culprit(rec, j);
2862
2863                         return;
2864                 }
2865
2866                 /* if the nodes disagree on how many nodes there are
2867                    then this is a good reason to try recovery
2868                  */
2869                 if (remote_nodemaps[j]->num != nodemap->num) {
2870                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2871                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2872                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2873                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2874                         return;
2875                 }
2876
2877                 /* if the nodes disagree on which nodes exist and are
2878                    active, then that is also a good reason to do recovery
2879                  */
2880                 for (i=0;i<nodemap->num;i++) {
2881                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2882                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2883                                           nodemap->nodes[j].pnn, i, 
2884                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2885                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2886                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2887                                             vnnmap);
2888                                 return;
2889                         }
2890                 }
2891         }
2892
2893         /*
2894          * Update node flags obtained from each active node. This ensure we have
2895          * up-to-date information for all the nodes.
2896          */
2897         for (j=0; j<nodemap->num; j++) {
2898                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2899                         continue;
2900                 }
2901                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2902         }
2903
2904         for (j=0; j<nodemap->num; j++) {
2905                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2906                         continue;
2907                 }
2908
2909                 /* verify the flags are consistent
2910                 */
2911                 for (i=0; i<nodemap->num; i++) {
2912                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2913                                 continue;
2914                         }
2915                         
2916                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2917                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2918                                   nodemap->nodes[j].pnn, 
2919                                   nodemap->nodes[i].pnn, 
2920                                   remote_nodemaps[j]->nodes[i].flags,
2921                                   nodemap->nodes[i].flags));
2922                                 if (i == j) {
2923                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2924                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2925                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2926                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2927                                                     vnnmap);
2928                                         return;
2929                                 } else {
2930                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2931                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2932                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2933                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2934                                                     vnnmap);
2935                                         return;
2936                                 }
2937                         }
2938                 }
2939         }
2940
2941
2942         /* count how many active nodes there are */
2943         num_lmasters  = 0;
2944         for (i=0; i<nodemap->num; i++) {
2945                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2946                         if (ctdb_node_has_capabilities(rec->caps,
2947                                                        ctdb->nodes[i]->pnn,
2948                                                        CTDB_CAP_LMASTER)) {
2949                                 num_lmasters++;
2950                         }
2951                 }
2952         }
2953
2954
2955         /* There must be the same number of lmasters in the vnn map as
2956          * there are active nodes with the lmaster capability...  or
2957          * do a recovery.
2958          */
2959         if (vnnmap->size != num_lmasters) {
2960                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2961                           vnnmap->size, num_lmasters));
2962                 ctdb_set_culprit(rec, ctdb->pnn);
2963                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2964                 return;
2965         }
2966
2967         /* verify that all active nodes in the nodemap also exist in 
2968            the vnnmap.
2969          */
2970         for (j=0; j<nodemap->num; j++) {
2971                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2972                         continue;
2973                 }
2974                 if (nodemap->nodes[j].pnn == pnn) {
2975                         continue;
2976                 }
2977
2978                 for (i=0; i<vnnmap->size; i++) {
2979                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2980                                 break;
2981                         }
2982                 }
2983                 if (i == vnnmap->size) {
2984                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2985                                   nodemap->nodes[j].pnn));
2986                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2987                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2988                         return;
2989                 }
2990         }
2991
2992         
2993         /* verify that all other nodes have the same vnnmap
2994            and are from the same generation
2995          */
2996         for (j=0; j<nodemap->num; j++) {
2997                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2998                         continue;
2999                 }
3000                 if (nodemap->nodes[j].pnn == pnn) {
3001                         continue;
3002                 }
3003
3004                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3005                                           mem_ctx, &remote_vnnmap);
3006                 if (ret != 0) {
3007                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3008                                   nodemap->nodes[j].pnn));
3009                         return;
3010                 }
3011
3012                 /* verify the vnnmap generation is the same */
3013                 if (vnnmap->generation != remote_vnnmap->generation) {
3014                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3015                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3016                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3017                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3018                         return;
3019                 }
3020
3021                 /* verify the vnnmap size is the same */
3022                 if (vnnmap->size != remote_vnnmap->size) {
3023                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3024                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3025                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3026                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3027                         return;
3028                 }
3029
3030                 /* verify the vnnmap is the same */
3031                 for (i=0;i<vnnmap->size;i++) {
3032                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3033                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3034                                           nodemap->nodes[j].pnn));
3035                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3036                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3037                                             vnnmap);
3038                                 return;
3039                         }
3040                 }
3041         }
3042
3043         /* FIXME: Add remote public IP checking to ensure that nodes
3044          * have the IP addresses that are allocated to them. */
3045
3046 takeover_run_checks:
3047
3048         /* If there are IP takeover runs requested or the previous one
3049          * failed then perform one and notify the waiters */
3050         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3051             (rec->reallocate_requests || rec->need_takeover_run)) {
3052                 process_ipreallocate_requests(ctdb, rec);
3053         }
3054 }
3055
3056 static void recd_sig_term_handler(struct tevent_context *ev,
3057                                   struct tevent_signal *se, int signum,
3058                                   int count, void *dont_care,
3059                                   void *private_data)
3060 {
3061         struct ctdb_recoverd *rec = talloc_get_type_abort(
3062                 private_data, struct ctdb_recoverd);
3063
3064         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
3065         ctdb_recovery_unlock(rec);
3066         exit(0);
3067 }
3068
3069
3070 /*
3071   the main monitoring loop
3072  */
3073 static void monitor_cluster(struct ctdb_context *ctdb)
3074 {
3075         struct tevent_signal *se;
3076         struct ctdb_recoverd *rec;
3077
3078         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3079
3080         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3081         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3082
3083         rec->ctdb = ctdb;
3084         rec->recmaster = CTDB_UNKNOWN_PNN;
3085         rec->recovery_lock_handle = NULL;
3086
3087         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3088         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3089
3090         rec->recovery = ctdb_op_init(rec, "recoveries");
3091         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3092
3093         rec->priority_time = timeval_current();
3094         rec->frozen_on_inactive = false;
3095
3096         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
3097                                recd_sig_term_handler, rec);
3098         if (se == NULL) {
3099                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
3100                 exit(1);
3101         }
3102
3103         /* register a message port for sending memory dumps */
3104         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3105
3106         /* when a node is assigned banning credits */
3107         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3108                                         banning_handler, rec);
3109
3110         /* register a message port for recovery elections */
3111         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3112
3113         /* when nodes are disabled/enabled */
3114         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3115
3116         /* when we are asked to puch out a flag change */
3117         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3118
3119         /* register a message port for vacuum fetch */
3120         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3121
3122         /* register a message port for reloadnodes  */
3123         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3124
3125         /* register a message port for performing a takeover run */
3126         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3127
3128         /* register a message port for disabling the ip check for a short while */
3129         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3130
3131         /* register a message port for forcing a rebalance of a node next
3132            reallocation */
3133         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3134
3135         /* Register a message port for disabling takeover runs */
3136         ctdb_client_set_message_handler(ctdb,
3137                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3138                                         disable_takeover_runs_handler, rec);
3139
3140         /* Register a message port for disabling recoveries */
3141         ctdb_client_set_message_handler(ctdb,
3142                                         CTDB_SRVID_DISABLE_RECOVERIES,
3143                                         disable_recoveries_handler, rec);
3144
3145         /* register a message port for detaching database */
3146         ctdb_client_set_message_handler(ctdb,
3147                                         CTDB_SRVID_DETACH_DATABASE,
3148                                         detach_database_handler, rec);
3149
3150         for (;;) {
3151                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3152                 struct timeval start;
3153                 double elapsed;
3154
3155                 if (!mem_ctx) {
3156                         DEBUG(DEBUG_CRIT,(__location__
3157                                           " Failed to create temp context\n"));
3158                         exit(-1);
3159                 }
3160
3161                 start = timeval_current();
3162                 main_loop(ctdb, rec, mem_ctx);
3163                 talloc_free(mem_ctx);
3164
3165                 /* we only check for recovery once every second */
3166                 elapsed = timeval_elapsed(&start);
3167                 if (elapsed < ctdb->tunable.recover_interval) {
3168                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3169                                           - elapsed);
3170                 }
3171         }
3172 }
3173
3174 /*
3175   event handler for when the main ctdbd dies
3176  */
3177 static void ctdb_recoverd_parent(struct tevent_context *ev,
3178                                  struct tevent_fd *fde,
3179                                  uint16_t flags, void *private_data)
3180 {
3181         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3182         _exit(1);
3183 }
3184
3185 /*
3186   called regularly to verify that the recovery daemon is still running
3187  */
3188 static void ctdb_check_recd(struct tevent_context *ev,
3189                             struct tevent_timer *te,
3190                             struct timeval yt, void *p)
3191 {
3192         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3193
3194         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3195                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3196
3197                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3198                                  ctdb_restart_recd, ctdb);
3199
3200                 return;
3201         }
3202
3203         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3204                          timeval_current_ofs(30, 0),
3205                          ctdb_check_recd, ctdb);
3206 }
3207
3208 static void recd_sig_child_handler(struct tevent_context *ev,
3209                                    struct tevent_signal *se, int signum,
3210                                    int count, void *dont_care,
3211                                    void *private_data)
3212 {
3213 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3214         int status;
3215         pid_t pid = -1;
3216
3217         while (pid != 0) {
3218                 pid = waitpid(-1, &status, WNOHANG);
3219                 if (pid == -1) {
3220                         if (errno != ECHILD) {
3221                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3222                         }
3223                         return;
3224                 }
3225                 if (pid > 0) {
3226                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3227                 }
3228         }
3229 }
3230
3231 /*
3232   startup the recovery daemon as a child of the main ctdb daemon
3233  */
3234 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3235 {
3236         int fd[2];
3237         struct tevent_signal *se;
3238         struct tevent_fd *fde;
3239         int ret;
3240
3241         if (pipe(fd) != 0) {
3242                 return -1;
3243         }
3244
3245         ctdb->recoverd_pid = ctdb_fork(ctdb);
3246         if (ctdb->recoverd_pid == -1) {
3247                 return -1;
3248         }
3249
3250         if (ctdb->recoverd_pid != 0) {
3251                 talloc_free(ctdb->recd_ctx);
3252                 ctdb->recd_ctx = talloc_new(ctdb);
3253                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3254
3255                 close(fd[0]);
3256                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3257                                  timeval_current_ofs(30, 0),
3258                                  ctdb_check_recd, ctdb);
3259                 return 0;
3260         }
3261
3262         close(fd[1]);
3263
3264         srandom(getpid() ^ time(NULL));
3265
3266         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3267         if (ret != 0) {
3268                 return -1;
3269         }
3270
3271         prctl_set_comment("ctdb_recoverd");
3272         if (switch_from_server_to_client(ctdb) != 0) {
3273                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3274                 exit(1);
3275         }
3276
3277         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3278
3279         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3280                             ctdb_recoverd_parent, &fd[0]);
3281         tevent_fd_set_auto_close(fde);
3282
3283         /* set up a handler to pick up sigchld */
3284         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3285                                recd_sig_child_handler, ctdb);
3286         if (se == NULL) {
3287                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3288                 exit(1);
3289         }
3290
3291         monitor_cluster(ctdb);
3292
3293         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3294         return -1;
3295 }
3296
3297 /*
3298   shutdown the recovery daemon
3299  */
3300 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3301 {
3302         if (ctdb->recoverd_pid == 0) {
3303                 return;
3304         }
3305
3306         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3307         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3308
3309         TALLOC_FREE(ctdb->recd_ctx);
3310         TALLOC_FREE(ctdb->recd_ping_count);
3311 }
3312
3313 static void ctdb_restart_recd(struct tevent_context *ev,
3314                               struct tevent_timer *te,
3315                               struct timeval t, void *private_data)
3316 {
3317         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3318
3319         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3320         ctdb_stop_recoverd(ctdb);
3321         ctdb_start_recoverd(ctdb);
3322 }