578127a4514755705c4b5c6eff10fe6c1f2fc086
[metze/samba/wip.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system_socket.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "server/ctdb_config.h"
46
47 #include "ctdb_cluster_mutex.h"
48
49 /* List of SRVID requests that need to be processed */
50 struct srvid_list {
51         struct srvid_list *next, *prev;
52         struct ctdb_srvid_message *request;
53 };
54
55 struct srvid_requests {
56         struct srvid_list *requests;
57 };
58
59 static void srvid_request_reply(struct ctdb_context *ctdb,
60                                 struct ctdb_srvid_message *request,
61                                 TDB_DATA result)
62 {
63         /* Someone that sent srvid==0 does not want a reply */
64         if (request->srvid == 0) {
65                 talloc_free(request);
66                 return;
67         }
68
69         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
70                                      result) == 0) {
71                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
72                                   (unsigned)request->pnn,
73                                   (unsigned long long)request->srvid));
74         } else {
75                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
76                                  (unsigned)request->pnn,
77                                  (unsigned long long)request->srvid));
78         }
79
80         talloc_free(request);
81 }
82
83 static void srvid_requests_reply(struct ctdb_context *ctdb,
84                                  struct srvid_requests **requests,
85                                  TDB_DATA result)
86 {
87         struct srvid_list *r;
88
89         if (*requests == NULL) {
90                 return;
91         }
92
93         for (r = (*requests)->requests; r != NULL; r = r->next) {
94                 srvid_request_reply(ctdb, r->request, result);
95         }
96
97         /* Free the list structure... */
98         TALLOC_FREE(*requests);
99 }
100
101 static void srvid_request_add(struct ctdb_context *ctdb,
102                               struct srvid_requests **requests,
103                               struct ctdb_srvid_message *request)
104 {
105         struct srvid_list *t;
106         int32_t ret;
107         TDB_DATA result;
108
109         if (*requests == NULL) {
110                 *requests = talloc_zero(ctdb, struct srvid_requests);
111                 if (*requests == NULL) {
112                         goto nomem;
113                 }
114         }
115
116         t = talloc_zero(*requests, struct srvid_list);
117         if (t == NULL) {
118                 /* If *requests was just allocated above then free it */
119                 if ((*requests)->requests == NULL) {
120                         TALLOC_FREE(*requests);
121                 }
122                 goto nomem;
123         }
124
125         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
126         DLIST_ADD((*requests)->requests, t);
127
128         return;
129
130 nomem:
131         /* Failed to add the request to the list.  Send a fail. */
132         DEBUG(DEBUG_ERR, (__location__
133                           " Out of memory, failed to queue SRVID request\n"));
134         ret = -ENOMEM;
135         result.dsize = sizeof(ret);
136         result.dptr = (uint8_t *)&ret;
137         srvid_request_reply(ctdb, request, result);
138 }
139
140 /* An abstraction to allow an operation (takeover runs, recoveries,
141  * ...) to be disabled for a given timeout */
142 struct ctdb_op_state {
143         struct tevent_timer *timer;
144         bool in_progress;
145         const char *name;
146 };
147
148 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
149 {
150         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
151
152         if (state != NULL) {
153                 state->in_progress = false;
154                 state->name = name;
155         }
156
157         return state;
158 }
159
160 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
161 {
162         return state->timer != NULL;
163 }
164
165 static bool ctdb_op_begin(struct ctdb_op_state *state)
166 {
167         if (ctdb_op_is_disabled(state)) {
168                 DEBUG(DEBUG_NOTICE,
169                       ("Unable to begin - %s are disabled\n", state->name));
170                 return false;
171         }
172
173         state->in_progress = true;
174         return true;
175 }
176
177 static bool ctdb_op_end(struct ctdb_op_state *state)
178 {
179         return state->in_progress = false;
180 }
181
182 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
183 {
184         return state->in_progress;
185 }
186
187 static void ctdb_op_enable(struct ctdb_op_state *state)
188 {
189         TALLOC_FREE(state->timer);
190 }
191
192 static void ctdb_op_timeout_handler(struct tevent_context *ev,
193                                     struct tevent_timer *te,
194                                     struct timeval yt, void *p)
195 {
196         struct ctdb_op_state *state =
197                 talloc_get_type(p, struct ctdb_op_state);
198
199         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
200         ctdb_op_enable(state);
201 }
202
203 static int ctdb_op_disable(struct ctdb_op_state *state,
204                            struct tevent_context *ev,
205                            uint32_t timeout)
206 {
207         if (timeout == 0) {
208                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
209                 ctdb_op_enable(state);
210                 return 0;
211         }
212
213         if (state->in_progress) {
214                 DEBUG(DEBUG_ERR,
215                       ("Unable to disable %s - in progress\n", state->name));
216                 return -EAGAIN;
217         }
218
219         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
220                             state->name, timeout));
221
222         /* Clear any old timers */
223         talloc_free(state->timer);
224
225         /* Arrange for the timeout to occur */
226         state->timer = tevent_add_timer(ev, state,
227                                         timeval_current_ofs(timeout, 0),
228                                         ctdb_op_timeout_handler, state);
229         if (state->timer == NULL) {
230                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
231                 return -ENOMEM;
232         }
233
234         return 0;
235 }
236
237 struct ctdb_banning_state {
238         uint32_t count;
239         struct timeval last_reported_time;
240 };
241
242 struct ctdb_recovery_lock_handle;
243
244 /*
245   private state of recovery daemon
246  */
247 struct ctdb_recoverd {
248         struct ctdb_context *ctdb;
249         uint32_t recmaster;
250         uint32_t last_culprit_node;
251         struct ctdb_node_map_old *nodemap;
252         struct timeval priority_time;
253         bool need_takeover_run;
254         bool need_recovery;
255         uint32_t node_flags;
256         struct tevent_timer *send_election_te;
257         struct tevent_timer *election_timeout;
258         struct srvid_requests *reallocate_requests;
259         struct ctdb_op_state *takeover_run;
260         struct ctdb_op_state *recovery;
261         struct ctdb_iface_list_old *ifaces;
262         uint32_t *force_rebalance_nodes;
263         struct ctdb_node_capabilities *caps;
264         bool frozen_on_inactive;
265         struct ctdb_recovery_lock_handle *recovery_lock_handle;
266 };
267
268 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
269 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
270
271 static void ctdb_restart_recd(struct tevent_context *ev,
272                               struct tevent_timer *te, struct timeval t,
273                               void *private_data);
274
275 /*
276   ban a node for a period of time
277  */
278 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
279 {
280         int ret;
281         struct ctdb_context *ctdb = rec->ctdb;
282         struct ctdb_ban_state bantime;
283
284         if (!ctdb_validate_pnn(ctdb, pnn)) {
285                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
286                 return;
287         }
288
289         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
290
291         bantime.pnn  = pnn;
292         bantime.time = ban_time;
293
294         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
295         if (ret != 0) {
296                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
297                 return;
298         }
299
300 }
301
302 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
303
304
305 /*
306   remember the trouble maker
307  */
308 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
309 {
310         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
311         struct ctdb_banning_state *ban_state;
312
313         if (culprit > ctdb->num_nodes) {
314                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
315                 return;
316         }
317
318         /* If we are banned or stopped, do not set other nodes as culprits */
319         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
320                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
321                 return;
322         }
323
324         if (ctdb->nodes[culprit]->ban_state == NULL) {
325                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
326                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
327
328                 
329         }
330         ban_state = ctdb->nodes[culprit]->ban_state;
331         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
332                 /* this was the first time in a long while this node
333                    misbehaved so we will forgive any old transgressions.
334                 */
335                 ban_state->count = 0;
336         }
337
338         ban_state->count += count;
339         ban_state->last_reported_time = timeval_current();
340         rec->last_culprit_node = culprit;
341 }
342
343 /*
344   remember the trouble maker
345  */
346 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
347 {
348         ctdb_set_culprit_count(rec, culprit, 1);
349 }
350
351 /*
352   Retrieve capabilities from all connected nodes
353  */
354 static int update_capabilities(struct ctdb_recoverd *rec,
355                                struct ctdb_node_map_old *nodemap)
356 {
357         uint32_t *capp;
358         TALLOC_CTX *tmp_ctx;
359         struct ctdb_node_capabilities *caps;
360         struct ctdb_context *ctdb = rec->ctdb;
361
362         tmp_ctx = talloc_new(rec);
363         CTDB_NO_MEMORY(ctdb, tmp_ctx);
364
365         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
366                                      CONTROL_TIMEOUT(), nodemap);
367
368         if (caps == NULL) {
369                 DEBUG(DEBUG_ERR,
370                       (__location__ " Failed to get node capabilities\n"));
371                 talloc_free(tmp_ctx);
372                 return -1;
373         }
374
375         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
376         if (capp == NULL) {
377                 DEBUG(DEBUG_ERR,
378                       (__location__
379                        " Capabilities don't include current node.\n"));
380                 talloc_free(tmp_ctx);
381                 return -1;
382         }
383         ctdb->capabilities = *capp;
384
385         TALLOC_FREE(rec->caps);
386         rec->caps = talloc_steal(rec, caps);
387
388         talloc_free(tmp_ctx);
389         return 0;
390 }
391
392 /*
393   change recovery mode on all nodes
394  */
395 static int set_recovery_mode(struct ctdb_context *ctdb,
396                              struct ctdb_recoverd *rec,
397                              struct ctdb_node_map_old *nodemap,
398                              uint32_t rec_mode)
399 {
400         TDB_DATA data;
401         uint32_t *nodes;
402         TALLOC_CTX *tmp_ctx;
403
404         tmp_ctx = talloc_new(ctdb);
405         CTDB_NO_MEMORY(ctdb, tmp_ctx);
406
407         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
408
409         data.dsize = sizeof(uint32_t);
410         data.dptr = (unsigned char *)&rec_mode;
411
412         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
413                                         nodes, 0,
414                                         CONTROL_TIMEOUT(),
415                                         false, data,
416                                         NULL, NULL,
417                                         NULL) != 0) {
418                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
419                 talloc_free(tmp_ctx);
420                 return -1;
421         }
422
423         talloc_free(tmp_ctx);
424         return 0;
425 }
426
427 /*
428   ensure all other nodes have attached to any databases that we have
429  */
430 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
431                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
432 {
433         int i, j, db, ret;
434         struct ctdb_dbid_map_old *remote_dbmap;
435
436         /* verify that all other nodes have all our databases */
437         for (j=0; j<nodemap->num; j++) {
438                 /* we don't need to ourself ourselves */
439                 if (nodemap->nodes[j].pnn == pnn) {
440                         continue;
441                 }
442                 /* don't check nodes that are unavailable */
443                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
444                         continue;
445                 }
446
447                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
448                                          mem_ctx, &remote_dbmap);
449                 if (ret != 0) {
450                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
451                         return -1;
452                 }
453
454                 /* step through all local databases */
455                 for (db=0; db<dbmap->num;db++) {
456                         const char *name;
457
458
459                         for (i=0;i<remote_dbmap->num;i++) {
460                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
461                                         break;
462                                 }
463                         }
464                         /* the remote node already have this database */
465                         if (i!=remote_dbmap->num) {
466                                 continue;
467                         }
468                         /* ok so we need to create this database */
469                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
470                                                   dbmap->dbs[db].db_id, mem_ctx,
471                                                   &name);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
474                                 return -1;
475                         }
476                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
477                                                  nodemap->nodes[j].pnn,
478                                                  mem_ctx, name,
479                                                  dbmap->dbs[db].flags, NULL);
480                         if (ret != 0) {
481                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
482                                 return -1;
483                         }
484                 }
485         }
486
487         return 0;
488 }
489
490
491 /*
492   ensure we are attached to any databases that anyone else is attached to
493  */
494 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
495                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
496 {
497         int i, j, db, ret;
498         struct ctdb_dbid_map_old *remote_dbmap;
499
500         /* verify that we have all database any other node has */
501         for (j=0; j<nodemap->num; j++) {
502                 /* we don't need to ourself ourselves */
503                 if (nodemap->nodes[j].pnn == pnn) {
504                         continue;
505                 }
506                 /* don't check nodes that are unavailable */
507                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
508                         continue;
509                 }
510
511                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
512                                          mem_ctx, &remote_dbmap);
513                 if (ret != 0) {
514                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
515                         return -1;
516                 }
517
518                 /* step through all databases on the remote node */
519                 for (db=0; db<remote_dbmap->num;db++) {
520                         const char *name;
521
522                         for (i=0;i<(*dbmap)->num;i++) {
523                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
524                                         break;
525                                 }
526                         }
527                         /* we already have this db locally */
528                         if (i!=(*dbmap)->num) {
529                                 continue;
530                         }
531                         /* ok so we need to create this database and
532                            rebuild dbmap
533                          */
534                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
535                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
536                         if (ret != 0) {
537                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
538                                           nodemap->nodes[j].pnn));
539                                 return -1;
540                         }
541                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
542                                            mem_ctx, name,
543                                            remote_dbmap->dbs[db].flags, NULL);
544                         if (ret != 0) {
545                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
546                                 return -1;
547                         }
548                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
549                         if (ret != 0) {
550                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
551                                 return -1;
552                         }
553                 }
554         }
555
556         return 0;
557 }
558
559 /*
560   update flags on all active nodes
561  */
562 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
563 {
564         int ret;
565
566         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
567                 if (ret != 0) {
568                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
569                 return -1;
570         }
571
572         return 0;
573 }
574
575 /*
576   called when a vacuum fetch has completed - just free it and do the next one
577  */
578 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
579 {
580         talloc_free(state);
581 }
582
583
584 /**
585  * Process one elements of the vacuum fetch list:
586  * Migrate it over to us with the special flag
587  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
588  */
589 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
590                                      uint32_t pnn,
591                                      struct ctdb_rec_data_old *r)
592 {
593         struct ctdb_client_call_state *state;
594         TDB_DATA data;
595         struct ctdb_ltdb_header *hdr;
596         struct ctdb_call call;
597
598         ZERO_STRUCT(call);
599         call.call_id = CTDB_NULL_FUNC;
600         call.flags = CTDB_IMMEDIATE_MIGRATION;
601         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
602
603         call.key.dptr = &r->data[0];
604         call.key.dsize = r->keylen;
605
606         /* ensure we don't block this daemon - just skip a record if we can't get
607            the chainlock */
608         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
609                 return true;
610         }
611
612         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
613         if (data.dptr == NULL) {
614                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
615                 return true;
616         }
617
618         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
619                 free(data.dptr);
620                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
621                 return true;
622         }
623
624         hdr = (struct ctdb_ltdb_header *)data.dptr;
625         if (hdr->dmaster == pnn) {
626                 /* its already local */
627                 free(data.dptr);
628                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
629                 return true;
630         }
631
632         free(data.dptr);
633
634         state = ctdb_call_send(ctdb_db, &call);
635         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
636         if (state == NULL) {
637                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
638                 return false;
639         }
640         state->async.fn = vacuum_fetch_callback;
641         state->async.private_data = NULL;
642
643         return true;
644 }
645
646
647 /*
648   handler for vacuum fetch
649 */
650 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
651                                  void *private_data)
652 {
653         struct ctdb_recoverd *rec = talloc_get_type(
654                 private_data, struct ctdb_recoverd);
655         struct ctdb_context *ctdb = rec->ctdb;
656         struct ctdb_marshall_buffer *recs;
657         int ret, i;
658         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
659         const char *name;
660         struct ctdb_dbid_map_old *dbmap=NULL;
661         uint8_t db_flags = 0;
662         struct ctdb_db_context *ctdb_db;
663         struct ctdb_rec_data_old *r;
664
665         recs = (struct ctdb_marshall_buffer *)data.dptr;
666
667         if (recs->count == 0) {
668                 goto done;
669         }
670
671         /* work out if the database is persistent */
672         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
673         if (ret != 0) {
674                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
675                 goto done;
676         }
677
678         for (i=0;i<dbmap->num;i++) {
679                 if (dbmap->dbs[i].db_id == recs->db_id) {
680                         db_flags = dbmap->dbs[i].flags;
681                         break;
682                 }
683         }
684         if (i == dbmap->num) {
685                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
686                 goto done;
687         }
688
689         /* find the name of this database */
690         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
691                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
692                 goto done;
693         }
694
695         /* attach to it */
696         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
697         if (ctdb_db == NULL) {
698                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
699                 goto done;
700         }
701
702         r = (struct ctdb_rec_data_old *)&recs->data[0];
703         while (recs->count) {
704                 bool ok;
705
706                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
707                 if (!ok) {
708                         break;
709                 }
710
711                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
712                 recs->count--;
713         }
714
715 done:
716         talloc_free(tmp_ctx);
717 }
718
719
720 /*
721  * handler for database detach
722  */
723 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
724                                     void *private_data)
725 {
726         struct ctdb_recoverd *rec = talloc_get_type(
727                 private_data, struct ctdb_recoverd);
728         struct ctdb_context *ctdb = rec->ctdb;
729         uint32_t db_id;
730         struct ctdb_db_context *ctdb_db;
731
732         if (data.dsize != sizeof(db_id)) {
733                 return;
734         }
735         db_id = *(uint32_t *)data.dptr;
736
737         ctdb_db = find_ctdb_db(ctdb, db_id);
738         if (ctdb_db == NULL) {
739                 /* database is not attached */
740                 return;
741         }
742
743         DLIST_REMOVE(ctdb->db_list, ctdb_db);
744
745         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
746                              ctdb_db->db_name));
747         talloc_free(ctdb_db);
748 }
749
750 /*
751   called when ctdb_wait_timeout should finish
752  */
753 static void ctdb_wait_handler(struct tevent_context *ev,
754                               struct tevent_timer *te,
755                               struct timeval yt, void *p)
756 {
757         uint32_t *timed_out = (uint32_t *)p;
758         (*timed_out) = 1;
759 }
760
761 /*
762   wait for a given number of seconds
763  */
764 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
765 {
766         uint32_t timed_out = 0;
767         time_t usecs = (secs - (time_t)secs) * 1000000;
768         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
769                          ctdb_wait_handler, &timed_out);
770         while (!timed_out) {
771                 tevent_loop_once(ctdb->ev);
772         }
773 }
774
775 /*
776   called when an election times out (ends)
777  */
778 static void ctdb_election_timeout(struct tevent_context *ev,
779                                   struct tevent_timer *te,
780                                   struct timeval t, void *p)
781 {
782         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
783         rec->election_timeout = NULL;
784         fast_start = false;
785
786         DEBUG(DEBUG_WARNING,("Election period ended\n"));
787 }
788
789
790 /*
791   wait for an election to finish. It finished election_timeout seconds after
792   the last election packet is received
793  */
794 static void ctdb_wait_election(struct ctdb_recoverd *rec)
795 {
796         struct ctdb_context *ctdb = rec->ctdb;
797         while (rec->election_timeout) {
798                 tevent_loop_once(ctdb->ev);
799         }
800 }
801
802 /*
803   Update our local flags from all remote connected nodes. 
804   This is only run when we are or we belive we are the recovery master
805  */
806 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
807 {
808         int j;
809         struct ctdb_context *ctdb = rec->ctdb;
810         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
811
812         /* get the nodemap for all active remote nodes and verify
813            they are the same as for this node
814          */
815         for (j=0; j<nodemap->num; j++) {
816                 struct ctdb_node_map_old *remote_nodemap=NULL;
817                 int ret;
818
819                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
820                         continue;
821                 }
822                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
823                         continue;
824                 }
825
826                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
827                                            mem_ctx, &remote_nodemap);
828                 if (ret != 0) {
829                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
830                                   nodemap->nodes[j].pnn));
831                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
832                         talloc_free(mem_ctx);
833                         return -1;
834                 }
835                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
836                         /* We should tell our daemon about this so it
837                            updates its flags or else we will log the same 
838                            message again in the next iteration of recovery.
839                            Since we are the recovery master we can just as
840                            well update the flags on all nodes.
841                         */
842                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
843                         if (ret != 0) {
844                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
845                                 return -1;
846                         }
847
848                         /* Update our local copy of the flags in the recovery
849                            daemon.
850                         */
851                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
852                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
853                                  nodemap->nodes[j].flags));
854                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
855                 }
856                 talloc_free(remote_nodemap);
857         }
858         talloc_free(mem_ctx);
859         return 0;
860 }
861
862
863 /* Create a new random generation id.
864    The generation id can not be the INVALID_GENERATION id
865 */
866 static uint32_t new_generation(void)
867 {
868         uint32_t generation;
869
870         while (1) {
871                 generation = random();
872
873                 if (generation != INVALID_GENERATION) {
874                         break;
875                 }
876         }
877
878         return generation;
879 }
880
881 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
882 {
883         return (rec->recovery_lock_handle != NULL);
884 }
885
886 struct ctdb_recovery_lock_handle {
887         bool done;
888         bool locked;
889         double latency;
890         struct ctdb_cluster_mutex_handle *h;
891 };
892
893 static void take_reclock_handler(char status,
894                                  double latency,
895                                  void *private_data)
896 {
897         struct ctdb_recovery_lock_handle *s =
898                 (struct ctdb_recovery_lock_handle *) private_data;
899
900         switch (status) {
901         case '0':
902                 s->latency = latency;
903                 break;
904
905         case '1':
906                 DEBUG(DEBUG_ERR,
907                       ("Unable to take recovery lock - contention\n"));
908                 break;
909
910         default:
911                 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
912         }
913
914         s->done = true;
915         s->locked = (status == '0') ;
916 }
917
918 static void force_election(struct ctdb_recoverd *rec,
919                            uint32_t pnn,
920                            struct ctdb_node_map_old *nodemap);
921
922 static void lost_reclock_handler(void *private_data)
923 {
924         struct ctdb_recoverd *rec = talloc_get_type_abort(
925                 private_data, struct ctdb_recoverd);
926
927         D_ERR("Recovery lock helper terminated, triggering an election\n");
928         TALLOC_FREE(rec->recovery_lock_handle);
929
930         force_election(rec, ctdb_get_pnn(rec->ctdb), rec->nodemap);
931 }
932
933 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
934 {
935         struct ctdb_context *ctdb = rec->ctdb;
936         struct ctdb_cluster_mutex_handle *h;
937         struct ctdb_recovery_lock_handle *s;
938
939         s = talloc_zero(rec, struct ctdb_recovery_lock_handle);
940         if (s == NULL) {
941                 DBG_ERR("Memory allocation error\n");
942                 return false;
943         };
944
945         h = ctdb_cluster_mutex(s,
946                                ctdb,
947                                ctdb->recovery_lock,
948                                0,
949                                take_reclock_handler,
950                                s,
951                                lost_reclock_handler,
952                                rec);
953         if (h == NULL) {
954                 talloc_free(s);
955                 return false;
956         }
957
958         rec->recovery_lock_handle = s;
959         s->h = h;
960
961         while (! s->done) {
962                 tevent_loop_once(ctdb->ev);
963         }
964
965         if (! s->locked) {
966                 TALLOC_FREE(rec->recovery_lock_handle);
967                 return false;
968         }
969
970         ctdb_ctrl_report_recd_lock_latency(ctdb,
971                                            CONTROL_TIMEOUT(),
972                                            s->latency);
973
974         return true;
975 }
976
977 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
978 {
979         if (rec->recovery_lock_handle == NULL) {
980                 return;
981         }
982
983         if (! rec->recovery_lock_handle->done) {
984                 /*
985                  * Taking of recovery lock still in progress.  Free
986                  * the cluster mutex handle to release it but leave
987                  * the recovery lock handle in place to allow taking
988                  * of the lock to fail.
989                  */
990                 D_NOTICE("Cancelling recovery lock\n");
991                 TALLOC_FREE(rec->recovery_lock_handle->h);
992                 rec->recovery_lock_handle->done = true;
993                 rec->recovery_lock_handle->locked = false;
994                 return;
995         }
996
997         D_NOTICE("Releasing recovery lock\n");
998         TALLOC_FREE(rec->recovery_lock_handle);
999 }
1000
1001 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1002 {
1003         struct ctdb_context *ctdb = rec->ctdb;
1004         int i;
1005         struct ctdb_banning_state *ban_state;
1006
1007         *self_ban = false;
1008         for (i=0; i<ctdb->num_nodes; i++) {
1009                 if (ctdb->nodes[i]->ban_state == NULL) {
1010                         continue;
1011                 }
1012                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1013                 if (ban_state->count < 2*ctdb->num_nodes) {
1014                         continue;
1015                 }
1016
1017                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1018                         ctdb->nodes[i]->pnn, ban_state->count,
1019                         ctdb->tunable.recovery_ban_period));
1020                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1021                 ban_state->count = 0;
1022
1023                 /* Banning ourself? */
1024                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1025                         *self_ban = true;
1026                 }
1027         }
1028 }
1029
1030 struct helper_state {
1031         int fd[2];
1032         pid_t pid;
1033         int result;
1034         bool done;
1035 };
1036
1037 static void helper_handler(struct tevent_context *ev,
1038                            struct tevent_fd *fde,
1039                            uint16_t flags, void *private_data)
1040 {
1041         struct helper_state *state = talloc_get_type_abort(
1042                 private_data, struct helper_state);
1043         int ret;
1044
1045         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1046         if (ret != sizeof(state->result)) {
1047                 state->result = EPIPE;
1048         }
1049
1050         state->done = true;
1051 }
1052
1053 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1054                       const char *prog, const char *arg, const char *type)
1055 {
1056         struct helper_state *state;
1057         struct tevent_fd *fde;
1058         const char **args;
1059         int nargs, ret;
1060         uint32_t recmaster = rec->recmaster;
1061
1062         state = talloc_zero(mem_ctx, struct helper_state);
1063         if (state == NULL) {
1064                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1065                 return -1;
1066         }
1067
1068         state->pid = -1;
1069
1070         ret = pipe(state->fd);
1071         if (ret != 0) {
1072                 DEBUG(DEBUG_ERR,
1073                       ("Failed to create pipe for %s helper\n", type));
1074                 goto fail;
1075         }
1076
1077         set_close_on_exec(state->fd[0]);
1078
1079         nargs = 4;
1080         args = talloc_array(state, const char *, nargs);
1081         if (args == NULL) {
1082                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1083                 goto fail;
1084         }
1085
1086         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1087         if (args[0] == NULL) {
1088                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1089                 goto fail;
1090         }
1091         args[1] = rec->ctdb->daemon.name;
1092         args[2] = arg;
1093         args[3] = NULL;
1094
1095         if (args[2] == NULL) {
1096                 nargs = 3;
1097         }
1098
1099         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
1100         if (state->pid == -1) {
1101                 DEBUG(DEBUG_ERR,
1102                       ("Failed to create child for %s helper\n", type));
1103                 goto fail;
1104         }
1105
1106         close(state->fd[1]);
1107         state->fd[1] = -1;
1108
1109         state->done = false;
1110
1111         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1112                             TEVENT_FD_READ, helper_handler, state);
1113         if (fde == NULL) {
1114                 goto fail;
1115         }
1116         tevent_fd_set_auto_close(fde);
1117
1118         while (!state->done) {
1119                 tevent_loop_once(rec->ctdb->ev);
1120
1121                 /* If recmaster changes, we have lost election */
1122                 if (recmaster != rec->recmaster) {
1123                         D_ERR("Recmaster changed to %u, aborting %s\n",
1124                               rec->recmaster, type);
1125                         state->result = 1;
1126                         break;
1127                 }
1128         }
1129
1130         close(state->fd[0]);
1131         state->fd[0] = -1;
1132
1133         if (state->result != 0) {
1134                 goto fail;
1135         }
1136
1137         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1138         talloc_free(state);
1139         return 0;
1140
1141 fail:
1142         if (state->fd[0] != -1) {
1143                 close(state->fd[0]);
1144         }
1145         if (state->fd[1] != -1) {
1146                 close(state->fd[1]);
1147         }
1148         if (state->pid != -1) {
1149                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1150         }
1151         talloc_free(state);
1152         return -1;
1153 }
1154
1155
1156 static int ctdb_takeover(struct ctdb_recoverd *rec,
1157                          uint32_t *force_rebalance_nodes)
1158 {
1159         static char prog[PATH_MAX+1] = "";
1160         char *arg;
1161         int i, ret;
1162
1163         if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
1164                              "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
1165                              "ctdb_takeover_helper")) {
1166                 ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
1167         }
1168
1169         arg = NULL;
1170         for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
1171                 uint32_t pnn = force_rebalance_nodes[i];
1172                 if (arg == NULL) {
1173                         arg = talloc_asprintf(rec, "%u", pnn);
1174                 } else {
1175                         arg = talloc_asprintf_append(arg, ",%u", pnn);
1176                 }
1177                 if (arg == NULL) {
1178                         DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1179                         return -1;
1180                 }
1181         }
1182
1183         if (ctdb_config.failover_disabled) {
1184                 ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
1185                 if (ret != 0) {
1186                         D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
1187                         return -1;
1188                 }
1189         }
1190
1191         return helper_run(rec, rec, prog, arg, "takeover");
1192 }
1193
1194 static bool do_takeover_run(struct ctdb_recoverd *rec,
1195                             struct ctdb_node_map_old *nodemap)
1196 {
1197         uint32_t *nodes = NULL;
1198         struct ctdb_disable_message dtr;
1199         TDB_DATA data;
1200         int i;
1201         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1202         int ret;
1203         bool ok;
1204
1205         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1206
1207         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1208                 DEBUG(DEBUG_ERR, (__location__
1209                                   " takeover run already in progress \n"));
1210                 ok = false;
1211                 goto done;
1212         }
1213
1214         if (!ctdb_op_begin(rec->takeover_run)) {
1215                 ok = false;
1216                 goto done;
1217         }
1218
1219         /* Disable IP checks (takeover runs, really) on other nodes
1220          * while doing this takeover run.  This will stop those other
1221          * nodes from triggering takeover runs when think they should
1222          * be hosting an IP but it isn't yet on an interface.  Don't
1223          * wait for replies since a failure here might cause some
1224          * noise in the logs but will not actually cause a problem.
1225          */
1226         ZERO_STRUCT(dtr);
1227         dtr.srvid = 0; /* No reply */
1228         dtr.pnn = -1;
1229
1230         data.dptr  = (uint8_t*)&dtr;
1231         data.dsize = sizeof(dtr);
1232
1233         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1234
1235         /* Disable for 60 seconds.  This can be a tunable later if
1236          * necessary.
1237          */
1238         dtr.timeout = 60;
1239         for (i = 0; i < talloc_array_length(nodes); i++) {
1240                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1241                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1242                                              data) != 0) {
1243                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1244                 }
1245         }
1246
1247         ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
1248
1249         /* Reenable takeover runs and IP checks on other nodes */
1250         dtr.timeout = 0;
1251         for (i = 0; i < talloc_array_length(nodes); i++) {
1252                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1253                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1254                                              data) != 0) {
1255                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1256                 }
1257         }
1258
1259         if (ret != 0) {
1260                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1261                 ok = false;
1262                 goto done;
1263         }
1264
1265         ok = true;
1266         /* Takeover run was successful so clear force rebalance targets */
1267         if (rebalance_nodes == rec->force_rebalance_nodes) {
1268                 TALLOC_FREE(rec->force_rebalance_nodes);
1269         } else {
1270                 DEBUG(DEBUG_WARNING,
1271                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1272         }
1273 done:
1274         rec->need_takeover_run = !ok;
1275         talloc_free(nodes);
1276         ctdb_op_end(rec->takeover_run);
1277
1278         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1279         return ok;
1280 }
1281
1282 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1283 {
1284         static char prog[PATH_MAX+1] = "";
1285         const char *arg;
1286
1287         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1288                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1289                              "ctdb_recovery_helper")) {
1290                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1291         }
1292
1293         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1294         if (arg == NULL) {
1295                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1296                 return -1;
1297         }
1298
1299         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1300
1301         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1302 }
1303
1304 /*
1305   we are the recmaster, and recovery is needed - start a recovery run
1306  */
1307 static int do_recovery(struct ctdb_recoverd *rec,
1308                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1309                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1310 {
1311         struct ctdb_context *ctdb = rec->ctdb;
1312         int i, ret;
1313         struct ctdb_dbid_map_old *dbmap;
1314         bool self_ban;
1315
1316         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1317
1318         /* Check if the current node is still the recmaster.  It's possible that
1319          * re-election has changed the recmaster.
1320          */
1321         if (pnn != rec->recmaster) {
1322                 DEBUG(DEBUG_NOTICE,
1323                       ("Recovery master changed to %u, aborting recovery\n",
1324                        rec->recmaster));
1325                 return -1;
1326         }
1327
1328         /* if recovery fails, force it again */
1329         rec->need_recovery = true;
1330
1331         if (!ctdb_op_begin(rec->recovery)) {
1332                 return -1;
1333         }
1334
1335         if (rec->election_timeout) {
1336                 /* an election is in progress */
1337                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1338                 goto fail;
1339         }
1340
1341         ban_misbehaving_nodes(rec, &self_ban);
1342         if (self_ban) {
1343                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1344                 goto fail;
1345         }
1346
1347         if (ctdb->recovery_lock != NULL) {
1348                 if (ctdb_recovery_have_lock(rec)) {
1349                         D_NOTICE("Already holding recovery lock\n");
1350                 } else {
1351                         bool ok;
1352
1353                         D_NOTICE("Attempting to take recovery lock (%s)\n",
1354                                  ctdb->recovery_lock);
1355
1356                         ok = ctdb_recovery_lock(rec);
1357                         if (! ok) {
1358                                 D_ERR("Unable to take recovery lock\n");
1359
1360                                 if (pnn != rec->recmaster) {
1361                                         D_NOTICE("Recovery master changed to %u,"
1362                                                  " aborting recovery\n",
1363                                                  rec->recmaster);
1364                                         rec->need_recovery = false;
1365                                         goto fail;
1366                                 }
1367
1368                                 if (ctdb->runstate ==
1369                                     CTDB_RUNSTATE_FIRST_RECOVERY) {
1370                                         /*
1371                                          * First recovery?  Perhaps
1372                                          * current node does not yet
1373                                          * know who the recmaster is.
1374                                          */
1375                                         D_ERR("Retrying recovery\n");
1376                                         goto fail;
1377                                 }
1378
1379                                 D_ERR("Abort recovery, "
1380                                       "ban this node for %u seconds\n",
1381                                       ctdb->tunable.recovery_ban_period);
1382                                 ctdb_ban_node(rec,
1383                                               pnn,
1384                                               ctdb->tunable.recovery_ban_period);
1385                                 goto fail;
1386                         }
1387                         D_NOTICE("Recovery lock taken successfully\n");
1388                 }
1389         }
1390
1391         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1392
1393         /* get a list of all databases */
1394         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1395         if (ret != 0) {
1396                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1397                 goto fail;
1398         }
1399
1400         /* we do the db creation before we set the recovery mode, so the freeze happens
1401            on all databases we will be dealing with. */
1402
1403         /* verify that we have all the databases any other node has */
1404         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1407                 goto fail;
1408         }
1409
1410         /* verify that all other nodes have all our databases */
1411         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1412         if (ret != 0) {
1413                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1414                 goto fail;
1415         }
1416         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1417
1418
1419         /* Retrieve capabilities from all connected nodes */
1420         ret = update_capabilities(rec, nodemap);
1421         if (ret!=0) {
1422                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1423                 return -1;
1424         }
1425
1426         /*
1427           update all nodes to have the same flags that we have
1428          */
1429         for (i=0;i<nodemap->num;i++) {
1430                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1431                         continue;
1432                 }
1433
1434                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1435                 if (ret != 0) {
1436                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1437                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1438                         } else {
1439                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1440                                 return -1;
1441                         }
1442                 }
1443         }
1444
1445         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1446
1447         ret = db_recovery_parallel(rec, mem_ctx);
1448         if (ret != 0) {
1449                 goto fail;
1450         }
1451
1452         do_takeover_run(rec, nodemap);
1453
1454         /* send a message to all clients telling them that the cluster 
1455            has been reconfigured */
1456         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1457                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1458         if (ret != 0) {
1459                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1460                 goto fail;
1461         }
1462
1463         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1464
1465         rec->need_recovery = false;
1466         ctdb_op_end(rec->recovery);
1467
1468         /* we managed to complete a full recovery, make sure to forgive
1469            any past sins by the nodes that could now participate in the
1470            recovery.
1471         */
1472         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1473         for (i=0;i<nodemap->num;i++) {
1474                 struct ctdb_banning_state *ban_state;
1475
1476                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1477                         continue;
1478                 }
1479
1480                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1481                 if (ban_state == NULL) {
1482                         continue;
1483                 }
1484
1485                 ban_state->count = 0;
1486         }
1487
1488         /* We just finished a recovery successfully.
1489            We now wait for rerecovery_timeout before we allow
1490            another recovery to take place.
1491         */
1492         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1493         ctdb_op_disable(rec->recovery, ctdb->ev,
1494                         ctdb->tunable.rerecovery_timeout);
1495         return 0;
1496
1497 fail:
1498         ctdb_op_end(rec->recovery);
1499         return -1;
1500 }
1501
1502
1503 /*
1504   elections are won by first checking the number of connected nodes, then
1505   the priority time, then the pnn
1506  */
1507 struct election_message {
1508         uint32_t num_connected;
1509         struct timeval priority_time;
1510         uint32_t pnn;
1511         uint32_t node_flags;
1512 };
1513
1514 /*
1515   form this nodes election data
1516  */
1517 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1518 {
1519         int ret, i;
1520         struct ctdb_node_map_old *nodemap;
1521         struct ctdb_context *ctdb = rec->ctdb;
1522
1523         ZERO_STRUCTP(em);
1524
1525         em->pnn = rec->ctdb->pnn;
1526         em->priority_time = rec->priority_time;
1527
1528         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1529         if (ret != 0) {
1530                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1531                 return;
1532         }
1533
1534         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1535         em->node_flags = rec->node_flags;
1536
1537         for (i=0;i<nodemap->num;i++) {
1538                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1539                         em->num_connected++;
1540                 }
1541         }
1542
1543         /* we shouldnt try to win this election if we cant be a recmaster */
1544         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1545                 em->num_connected = 0;
1546                 em->priority_time = timeval_current();
1547         }
1548
1549         talloc_free(nodemap);
1550 }
1551
1552 /*
1553   see if the given election data wins
1554  */
1555 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1556 {
1557         struct election_message myem;
1558         int cmp = 0;
1559
1560         ctdb_election_data(rec, &myem);
1561
1562         /* we cant win if we don't have the recmaster capability */
1563         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1564                 return false;
1565         }
1566
1567         /* we cant win if we are banned */
1568         if (rec->node_flags & NODE_FLAGS_BANNED) {
1569                 return false;
1570         }
1571
1572         /* we cant win if we are stopped */
1573         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1574                 return false;
1575         }
1576
1577         /* we will automatically win if the other node is banned */
1578         if (em->node_flags & NODE_FLAGS_BANNED) {
1579                 return true;
1580         }
1581
1582         /* we will automatically win if the other node is banned */
1583         if (em->node_flags & NODE_FLAGS_STOPPED) {
1584                 return true;
1585         }
1586
1587         /* then the longest running node */
1588         if (cmp == 0) {
1589                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1590         }
1591
1592         if (cmp == 0) {
1593                 cmp = (int)myem.pnn - (int)em->pnn;
1594         }
1595
1596         return cmp > 0;
1597 }
1598
1599 /*
1600   send out an election request
1601  */
1602 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1603 {
1604         int ret;
1605         TDB_DATA election_data;
1606         struct election_message emsg;
1607         uint64_t srvid;
1608         struct ctdb_context *ctdb = rec->ctdb;
1609
1610         srvid = CTDB_SRVID_ELECTION;
1611
1612         ctdb_election_data(rec, &emsg);
1613
1614         election_data.dsize = sizeof(struct election_message);
1615         election_data.dptr  = (unsigned char *)&emsg;
1616
1617
1618         /* first we assume we will win the election and set 
1619            recoverymaster to be ourself on the current node
1620          */
1621         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1622                                      CTDB_CURRENT_NODE, pnn);
1623         if (ret != 0) {
1624                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1625                 return -1;
1626         }
1627         rec->recmaster = pnn;
1628
1629         /* send an election message to all active nodes */
1630         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1631         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1632 }
1633
1634 /*
1635   we think we are winning the election - send a broadcast election request
1636  */
1637 static void election_send_request(struct tevent_context *ev,
1638                                   struct tevent_timer *te,
1639                                   struct timeval t, void *p)
1640 {
1641         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1642         int ret;
1643
1644         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1645         if (ret != 0) {
1646                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1647         }
1648
1649         TALLOC_FREE(rec->send_election_te);
1650 }
1651
1652 /*
1653   handler for memory dumps
1654 */
1655 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1656 {
1657         struct ctdb_recoverd *rec = talloc_get_type(
1658                 private_data, struct ctdb_recoverd);
1659         struct ctdb_context *ctdb = rec->ctdb;
1660         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1661         TDB_DATA *dump;
1662         int ret;
1663         struct ctdb_srvid_message *rd;
1664
1665         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1666                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1667                 talloc_free(tmp_ctx);
1668                 return;
1669         }
1670         rd = (struct ctdb_srvid_message *)data.dptr;
1671
1672         dump = talloc_zero(tmp_ctx, TDB_DATA);
1673         if (dump == NULL) {
1674                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1675                 talloc_free(tmp_ctx);
1676                 return;
1677         }
1678         ret = ctdb_dump_memory(ctdb, dump);
1679         if (ret != 0) {
1680                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1681                 talloc_free(tmp_ctx);
1682                 return;
1683         }
1684
1685 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1686
1687         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1690                 talloc_free(tmp_ctx);
1691                 return;
1692         }
1693
1694         talloc_free(tmp_ctx);
1695 }
1696
1697 /*
1698   handler for reload_nodes
1699 */
1700 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1701                                  void *private_data)
1702 {
1703         struct ctdb_recoverd *rec = talloc_get_type(
1704                 private_data, struct ctdb_recoverd);
1705
1706         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1707
1708         ctdb_load_nodes_file(rec->ctdb);
1709 }
1710
1711
1712 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1713                                         void *private_data)
1714 {
1715         struct ctdb_recoverd *rec = talloc_get_type(
1716                 private_data, struct ctdb_recoverd);
1717         struct ctdb_context *ctdb = rec->ctdb;
1718         uint32_t pnn;
1719         uint32_t *t;
1720         int len;
1721
1722         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1723                 return;
1724         }
1725
1726         if (data.dsize != sizeof(uint32_t)) {
1727                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1728                 return;
1729         }
1730
1731         pnn = *(uint32_t *)&data.dptr[0];
1732
1733         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1734
1735         /* Copy any existing list of nodes.  There's probably some
1736          * sort of realloc variant that will do this but we need to
1737          * make sure that freeing the old array also cancels the timer
1738          * event for the timeout... not sure if realloc will do that.
1739          */
1740         len = (rec->force_rebalance_nodes != NULL) ?
1741                 talloc_array_length(rec->force_rebalance_nodes) :
1742                 0;
1743
1744         /* This allows duplicates to be added but they don't cause
1745          * harm.  A call to add a duplicate PNN arguably means that
1746          * the timeout should be reset, so this is the simplest
1747          * solution.
1748          */
1749         t = talloc_zero_array(rec, uint32_t, len+1);
1750         CTDB_NO_MEMORY_VOID(ctdb, t);
1751         if (len > 0) {
1752                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1753         }
1754         t[len] = pnn;
1755
1756         talloc_free(rec->force_rebalance_nodes);
1757
1758         rec->force_rebalance_nodes = t;
1759 }
1760
1761
1762
1763 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1764                                     TDB_DATA data,
1765                                     struct ctdb_op_state *op_state)
1766 {
1767         struct ctdb_disable_message *r;
1768         uint32_t timeout;
1769         TDB_DATA result;
1770         int32_t ret = 0;
1771
1772         /* Validate input data */
1773         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1774                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1775                                  "expecting %lu\n", (long unsigned)data.dsize,
1776                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1777                 return;
1778         }
1779         if (data.dptr == NULL) {
1780                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1781                 return;
1782         }
1783
1784         r = (struct ctdb_disable_message *)data.dptr;
1785         timeout = r->timeout;
1786
1787         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1788         if (ret != 0) {
1789                 goto done;
1790         }
1791
1792         /* Returning our PNN tells the caller that we succeeded */
1793         ret = ctdb_get_pnn(ctdb);
1794 done:
1795         result.dsize = sizeof(int32_t);
1796         result.dptr  = (uint8_t *)&ret;
1797         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1798 }
1799
1800 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1801                                           void *private_data)
1802 {
1803         struct ctdb_recoverd *rec = talloc_get_type(
1804                 private_data, struct ctdb_recoverd);
1805
1806         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1807 }
1808
1809 /* Backward compatibility for this SRVID */
1810 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1811                                      void *private_data)
1812 {
1813         struct ctdb_recoverd *rec = talloc_get_type(
1814                 private_data, struct ctdb_recoverd);
1815         uint32_t timeout;
1816
1817         if (data.dsize != sizeof(uint32_t)) {
1818                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1819                                  "expecting %lu\n", (long unsigned)data.dsize,
1820                                  (long unsigned)sizeof(uint32_t)));
1821                 return;
1822         }
1823         if (data.dptr == NULL) {
1824                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1825                 return;
1826         }
1827
1828         timeout = *((uint32_t *)data.dptr);
1829
1830         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1831 }
1832
1833 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1834                                        void *private_data)
1835 {
1836         struct ctdb_recoverd *rec = talloc_get_type(
1837                 private_data, struct ctdb_recoverd);
1838
1839         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1840 }
1841
1842 /*
1843   handler for ip reallocate, just add it to the list of requests and 
1844   handle this later in the monitor_cluster loop so we do not recurse
1845   with other requests to takeover_run()
1846 */
1847 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1848                                   void *private_data)
1849 {
1850         struct ctdb_srvid_message *request;
1851         struct ctdb_recoverd *rec = talloc_get_type(
1852                 private_data, struct ctdb_recoverd);
1853
1854         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1855                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1856                 return;
1857         }
1858
1859         request = (struct ctdb_srvid_message *)data.dptr;
1860
1861         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1862 }
1863
1864 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1865                                           struct ctdb_recoverd *rec)
1866 {
1867         TDB_DATA result;
1868         int32_t ret;
1869         struct srvid_requests *current;
1870
1871         /* Only process requests that are currently pending.  More
1872          * might come in while the takeover run is in progress and
1873          * they will need to be processed later since they might
1874          * be in response flag changes.
1875          */
1876         current = rec->reallocate_requests;
1877         rec->reallocate_requests = NULL;
1878
1879         if (do_takeover_run(rec, rec->nodemap)) {
1880                 ret = ctdb_get_pnn(ctdb);
1881         } else {
1882                 ret = -1;
1883         }
1884
1885         result.dsize = sizeof(int32_t);
1886         result.dptr  = (uint8_t *)&ret;
1887
1888         srvid_requests_reply(ctdb, &current, result);
1889 }
1890
1891 /*
1892  * handler for assigning banning credits
1893  */
1894 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1895 {
1896         struct ctdb_recoverd *rec = talloc_get_type(
1897                 private_data, struct ctdb_recoverd);
1898         uint32_t ban_pnn;
1899
1900         /* Ignore if we are not recmaster */
1901         if (rec->ctdb->pnn != rec->recmaster) {
1902                 return;
1903         }
1904
1905         if (data.dsize != sizeof(uint32_t)) {
1906                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1907                                   data.dsize));
1908                 return;
1909         }
1910
1911         ban_pnn = *(uint32_t *)data.dptr;
1912
1913         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1914 }
1915
1916 /*
1917   handler for recovery master elections
1918 */
1919 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1920 {
1921         struct ctdb_recoverd *rec = talloc_get_type(
1922                 private_data, struct ctdb_recoverd);
1923         struct ctdb_context *ctdb = rec->ctdb;
1924         int ret;
1925         struct election_message *em = (struct election_message *)data.dptr;
1926
1927         /* Ignore election packets from ourself */
1928         if (ctdb->pnn == em->pnn) {
1929                 return;
1930         }
1931
1932         /* we got an election packet - update the timeout for the election */
1933         talloc_free(rec->election_timeout);
1934         rec->election_timeout = tevent_add_timer(
1935                         ctdb->ev, ctdb,
1936                         fast_start ?
1937                                 timeval_current_ofs(0, 500000) :
1938                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1939                         ctdb_election_timeout, rec);
1940
1941         /* someone called an election. check their election data
1942            and if we disagree and we would rather be the elected node, 
1943            send a new election message to all other nodes
1944          */
1945         if (ctdb_election_win(rec, em)) {
1946                 if (!rec->send_election_te) {
1947                         rec->send_election_te = tevent_add_timer(
1948                                         ctdb->ev, rec,
1949                                         timeval_current_ofs(0, 500000),
1950                                         election_send_request, rec);
1951                 }
1952                 return;
1953         }
1954
1955         /* we didn't win */
1956         TALLOC_FREE(rec->send_election_te);
1957
1958         /* Release the recovery lock file */
1959         if (ctdb_recovery_have_lock(rec)) {
1960                 ctdb_recovery_unlock(rec);
1961         }
1962
1963         /* ok, let that guy become recmaster then */
1964         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1965                                      CTDB_CURRENT_NODE, em->pnn);
1966         if (ret != 0) {
1967                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1968                 return;
1969         }
1970         rec->recmaster = em->pnn;
1971
1972         return;
1973 }
1974
1975
1976 /*
1977   force the start of the election process
1978  */
1979 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1980                            struct ctdb_node_map_old *nodemap)
1981 {
1982         int ret;
1983         struct ctdb_context *ctdb = rec->ctdb;
1984
1985         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1986
1987         /* set all nodes to recovery mode to stop all internode traffic */
1988         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1989         if (ret != 0) {
1990                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1991                 return;
1992         }
1993
1994         talloc_free(rec->election_timeout);
1995         rec->election_timeout = tevent_add_timer(
1996                         ctdb->ev, ctdb,
1997                         fast_start ?
1998                                 timeval_current_ofs(0, 500000) :
1999                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2000                         ctdb_election_timeout, rec);
2001
2002         ret = send_election_request(rec, pnn);
2003         if (ret!=0) {
2004                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2005                 return;
2006         }
2007
2008         /* wait for a few seconds to collect all responses */
2009         ctdb_wait_election(rec);
2010 }
2011
2012
2013
2014 /*
2015   handler for when a node changes its flags
2016 */
2017 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2018 {
2019         struct ctdb_recoverd *rec = talloc_get_type(
2020                 private_data, struct ctdb_recoverd);
2021         struct ctdb_context *ctdb = rec->ctdb;
2022         int ret;
2023         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2024         struct ctdb_node_map_old *nodemap=NULL;
2025         TALLOC_CTX *tmp_ctx;
2026         int i;
2027
2028         if (data.dsize != sizeof(*c)) {
2029                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2030                 return;
2031         }
2032
2033         tmp_ctx = talloc_new(ctdb);
2034         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2035
2036         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2037         if (ret != 0) {
2038                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2039                 talloc_free(tmp_ctx);
2040                 return;         
2041         }
2042
2043
2044         for (i=0;i<nodemap->num;i++) {
2045                 if (nodemap->nodes[i].pnn == c->pnn) break;
2046         }
2047
2048         if (i == nodemap->num) {
2049                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2050                 talloc_free(tmp_ctx);
2051                 return;
2052         }
2053
2054         if (c->old_flags != c->new_flags) {
2055                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2056         }
2057
2058         nodemap->nodes[i].flags = c->new_flags;
2059
2060         talloc_free(tmp_ctx);
2061 }
2062
2063 /*
2064   handler for when we need to push out flag changes ot all other nodes
2065 */
2066 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2067                                void *private_data)
2068 {
2069         struct ctdb_recoverd *rec = talloc_get_type(
2070                 private_data, struct ctdb_recoverd);
2071         struct ctdb_context *ctdb = rec->ctdb;
2072         int ret;
2073         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2074         struct ctdb_node_map_old *nodemap=NULL;
2075         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2076         uint32_t *nodes;
2077
2078         /* read the node flags from the recmaster */
2079         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2080                                    tmp_ctx, &nodemap);
2081         if (ret != 0) {
2082                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2083                 talloc_free(tmp_ctx);
2084                 return;
2085         }
2086         if (c->pnn >= nodemap->num) {
2087                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2088                 talloc_free(tmp_ctx);
2089                 return;
2090         }
2091
2092         /* send the flags update to all connected nodes */
2093         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2094
2095         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2096                                       nodes, 0, CONTROL_TIMEOUT(),
2097                                       false, data,
2098                                       NULL, NULL,
2099                                       NULL) != 0) {
2100                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2101
2102                 talloc_free(tmp_ctx);
2103                 return;
2104         }
2105
2106         talloc_free(tmp_ctx);
2107 }
2108
2109
2110 struct verify_recmode_normal_data {
2111         uint32_t count;
2112         enum monitor_result status;
2113 };
2114
2115 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2116 {
2117         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2118
2119
2120         /* one more node has responded with recmode data*/
2121         rmdata->count--;
2122
2123         /* if we failed to get the recmode, then return an error and let
2124            the main loop try again.
2125         */
2126         if (state->state != CTDB_CONTROL_DONE) {
2127                 if (rmdata->status == MONITOR_OK) {
2128                         rmdata->status = MONITOR_FAILED;
2129                 }
2130                 return;
2131         }
2132
2133         /* if we got a response, then the recmode will be stored in the
2134            status field
2135         */
2136         if (state->status != CTDB_RECOVERY_NORMAL) {
2137                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2138                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2139         }
2140
2141         return;
2142 }
2143
2144
2145 /* verify that all nodes are in normal recovery mode */
2146 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2147 {
2148         struct verify_recmode_normal_data *rmdata;
2149         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2150         struct ctdb_client_control_state *state;
2151         enum monitor_result status;
2152         int j;
2153         
2154         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2155         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2156         rmdata->count  = 0;
2157         rmdata->status = MONITOR_OK;
2158
2159         /* loop over all active nodes and send an async getrecmode call to 
2160            them*/
2161         for (j=0; j<nodemap->num; j++) {
2162                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2163                         continue;
2164                 }
2165                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2166                                         CONTROL_TIMEOUT(), 
2167                                         nodemap->nodes[j].pnn);
2168                 if (state == NULL) {
2169                         /* we failed to send the control, treat this as 
2170                            an error and try again next iteration
2171                         */                      
2172                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2173                         talloc_free(mem_ctx);
2174                         return MONITOR_FAILED;
2175                 }
2176
2177                 /* set up the callback functions */
2178                 state->async.fn = verify_recmode_normal_callback;
2179                 state->async.private_data = rmdata;
2180
2181                 /* one more control to wait for to complete */
2182                 rmdata->count++;
2183         }
2184
2185
2186         /* now wait for up to the maximum number of seconds allowed
2187            or until all nodes we expect a response from has replied
2188         */
2189         while (rmdata->count > 0) {
2190                 tevent_loop_once(ctdb->ev);
2191         }
2192
2193         status = rmdata->status;
2194         talloc_free(mem_ctx);
2195         return status;
2196 }
2197
2198
2199 struct verify_recmaster_data {
2200         struct ctdb_recoverd *rec;
2201         uint32_t count;
2202         uint32_t pnn;
2203         enum monitor_result status;
2204 };
2205
2206 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2207 {
2208         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2209
2210
2211         /* one more node has responded with recmaster data*/
2212         rmdata->count--;
2213
2214         /* if we failed to get the recmaster, then return an error and let
2215            the main loop try again.
2216         */
2217         if (state->state != CTDB_CONTROL_DONE) {
2218                 if (rmdata->status == MONITOR_OK) {
2219                         rmdata->status = MONITOR_FAILED;
2220                 }
2221                 return;
2222         }
2223
2224         /* if we got a response, then the recmaster will be stored in the
2225            status field
2226         */
2227         if (state->status != rmdata->pnn) {
2228                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2229                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2230                 rmdata->status = MONITOR_ELECTION_NEEDED;
2231         }
2232
2233         return;
2234 }
2235
2236
2237 /* verify that all nodes agree that we are the recmaster */
2238 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
2239 {
2240         struct ctdb_context *ctdb = rec->ctdb;
2241         struct verify_recmaster_data *rmdata;
2242         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2243         struct ctdb_client_control_state *state;
2244         enum monitor_result status;
2245         int j;
2246         
2247         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2248         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2249         rmdata->rec    = rec;
2250         rmdata->count  = 0;
2251         rmdata->pnn    = pnn;
2252         rmdata->status = MONITOR_OK;
2253
2254         /* loop over all active nodes and send an async getrecmaster call to
2255            them*/
2256         for (j=0; j<nodemap->num; j++) {
2257                 if (nodemap->nodes[j].pnn == rec->recmaster) {
2258                         continue;
2259                 }
2260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2261                         continue;
2262                 }
2263                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2264                                         CONTROL_TIMEOUT(),
2265                                         nodemap->nodes[j].pnn);
2266                 if (state == NULL) {
2267                         /* we failed to send the control, treat this as 
2268                            an error and try again next iteration
2269                         */                      
2270                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2271                         talloc_free(mem_ctx);
2272                         return MONITOR_FAILED;
2273                 }
2274
2275                 /* set up the callback functions */
2276                 state->async.fn = verify_recmaster_callback;
2277                 state->async.private_data = rmdata;
2278
2279                 /* one more control to wait for to complete */
2280                 rmdata->count++;
2281         }
2282
2283
2284         /* now wait for up to the maximum number of seconds allowed
2285            or until all nodes we expect a response from has replied
2286         */
2287         while (rmdata->count > 0) {
2288                 tevent_loop_once(ctdb->ev);
2289         }
2290
2291         status = rmdata->status;
2292         talloc_free(mem_ctx);
2293         return status;
2294 }
2295
2296 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2297                                     struct ctdb_recoverd *rec)
2298 {
2299         struct ctdb_iface_list_old *ifaces = NULL;
2300         TALLOC_CTX *mem_ctx;
2301         bool ret = false;
2302
2303         mem_ctx = talloc_new(NULL);
2304
2305         /* Read the interfaces from the local node */
2306         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2307                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2308                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2309                 /* We could return an error.  However, this will be
2310                  * rare so we'll decide that the interfaces have
2311                  * actually changed, just in case.
2312                  */
2313                 talloc_free(mem_ctx);
2314                 return true;
2315         }
2316
2317         if (!rec->ifaces) {
2318                 /* We haven't been here before so things have changed */
2319                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2320                 ret = true;
2321         } else if (rec->ifaces->num != ifaces->num) {
2322                 /* Number of interfaces has changed */
2323                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2324                                      rec->ifaces->num, ifaces->num));
2325                 ret = true;
2326         } else {
2327                 /* See if interface names or link states have changed */
2328                 int i;
2329                 for (i = 0; i < rec->ifaces->num; i++) {
2330                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2331                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2332                                 DEBUG(DEBUG_NOTICE,
2333                                       ("Interface in slot %d changed: %s => %s\n",
2334                                        i, iface->name, ifaces->ifaces[i].name));
2335                                 ret = true;
2336                                 break;
2337                         }
2338                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2339                                 DEBUG(DEBUG_NOTICE,
2340                                       ("Interface %s changed state: %d => %d\n",
2341                                        iface->name, iface->link_state,
2342                                        ifaces->ifaces[i].link_state));
2343                                 ret = true;
2344                                 break;
2345                         }
2346                 }
2347         }
2348
2349         talloc_free(rec->ifaces);
2350         rec->ifaces = talloc_steal(rec, ifaces);
2351
2352         talloc_free(mem_ctx);
2353         return ret;
2354 }
2355
2356 /* Check that the local allocation of public IP addresses is correct
2357  * and do some house-keeping */
2358 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2359                                       struct ctdb_recoverd *rec,
2360                                       uint32_t pnn,
2361                                       struct ctdb_node_map_old *nodemap)
2362 {
2363         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2364         int ret, j;
2365         bool need_takeover_run = false;
2366         struct ctdb_public_ip_list_old *ips = NULL;
2367
2368         /* If we are not the recmaster then do some housekeeping */
2369         if (rec->recmaster != pnn) {
2370                 /* Ignore any IP reallocate requests - only recmaster
2371                  * processes them
2372                  */
2373                 TALLOC_FREE(rec->reallocate_requests);
2374                 /* Clear any nodes that should be force rebalanced in
2375                  * the next takeover run.  If the recovery master role
2376                  * has moved then we don't want to process these some
2377                  * time in the future.
2378                  */
2379                 TALLOC_FREE(rec->force_rebalance_nodes);
2380         }
2381
2382         /* Return early if disabled... */
2383         if (ctdb_config.failover_disabled ||
2384             ctdb_op_is_disabled(rec->takeover_run)) {
2385                 return  0;
2386         }
2387
2388         if (interfaces_have_changed(ctdb, rec)) {
2389                 need_takeover_run = true;
2390         }
2391
2392         /* If there are unhosted IPs but this node can host them then
2393          * trigger an IP reallocation */
2394
2395         /* Read *available* IPs from local node */
2396         ret = ctdb_ctrl_get_public_ips_flags(
2397                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2398                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2399         if (ret != 0) {
2400                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2401                 talloc_free(mem_ctx);
2402                 return -1;
2403         }
2404
2405         for (j=0; j<ips->num; j++) {
2406                 if (ips->ips[j].pnn == -1 &&
2407                     nodemap->nodes[pnn].flags == 0) {
2408                         DEBUG(DEBUG_WARNING,
2409                               ("Unassigned IP %s can be served by this node\n",
2410                                ctdb_addr_to_str(&ips->ips[j].addr)));
2411                         need_takeover_run = true;
2412                 }
2413         }
2414
2415         talloc_free(ips);
2416
2417         if (!ctdb->do_checkpublicip) {
2418                 goto done;
2419         }
2420
2421         /* Validate the IP addresses that this node has on network
2422          * interfaces.  If there is an inconsistency between reality
2423          * and the state expected by CTDB then try to fix it by
2424          * triggering an IP reallocation or releasing extraneous IP
2425          * addresses. */
2426
2427         /* Read *known* IPs from local node */
2428         ret = ctdb_ctrl_get_public_ips_flags(
2429                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2430         if (ret != 0) {
2431                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2432                 talloc_free(mem_ctx);
2433                 return -1;
2434         }
2435
2436         for (j=0; j<ips->num; j++) {
2437                 if (ips->ips[j].pnn == pnn) {
2438                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2439                                 DEBUG(DEBUG_ERR,
2440                                       ("Assigned IP %s not on an interface\n",
2441                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2442                                 need_takeover_run = true;
2443                         }
2444                 } else {
2445                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2446                                 DEBUG(DEBUG_ERR,
2447                                       ("IP %s incorrectly on an interface\n",
2448                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2449                                 need_takeover_run = true;
2450                         }
2451                 }
2452         }
2453
2454 done:
2455         if (need_takeover_run) {
2456                 struct ctdb_srvid_message rd;
2457                 TDB_DATA data;
2458
2459                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2460
2461                 ZERO_STRUCT(rd);
2462                 rd.pnn = ctdb->pnn;
2463                 rd.srvid = 0;
2464                 data.dptr = (uint8_t *)&rd;
2465                 data.dsize = sizeof(rd);
2466
2467                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2468                 if (ret != 0) {
2469                         DEBUG(DEBUG_ERR,
2470                               ("Failed to send takeover run request\n"));
2471                 }
2472         }
2473         talloc_free(mem_ctx);
2474         return 0;
2475 }
2476
2477
2478 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2479 {
2480         struct ctdb_node_map_old **remote_nodemaps = callback_data;
2481
2482         if (node_pnn >= ctdb->num_nodes) {
2483                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2484                 return;
2485         }
2486
2487         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
2488
2489 }
2490
2491 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2492         struct ctdb_node_map_old *nodemap,
2493         struct ctdb_node_map_old **remote_nodemaps)
2494 {
2495         uint32_t *nodes;
2496
2497         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2498         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2499                                         nodes, 0,
2500                                         CONTROL_TIMEOUT(), false, tdb_null,
2501                                         async_getnodemap_callback,
2502                                         NULL,
2503                                         remote_nodemaps) != 0) {
2504                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2505
2506                 return -1;
2507         }
2508
2509         return 0;
2510 }
2511
2512 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2513                                      TALLOC_CTX *mem_ctx)
2514 {
2515         struct ctdb_context *ctdb = rec->ctdb;
2516         uint32_t pnn = ctdb_get_pnn(ctdb);
2517         struct ctdb_node_map_old *nodemap = rec->nodemap;
2518         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2519         int ret;
2520
2521         /* When recovery daemon is started, recmaster is set to
2522          * "unknown" so it knows to start an election.
2523          */
2524         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2525                 DEBUG(DEBUG_NOTICE,
2526                       ("Initial recovery master set - forcing election\n"));
2527                 force_election(rec, pnn, nodemap);
2528                 return false;
2529         }
2530
2531         /*
2532          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2533          * but we have, then force an election and try to become the new
2534          * recmaster.
2535          */
2536         if (!ctdb_node_has_capabilities(rec->caps,
2537                                         rec->recmaster,
2538                                         CTDB_CAP_RECMASTER) &&
2539             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2540             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2541                 DEBUG(DEBUG_ERR,
2542                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2543                        " but we (node %u) have - force an election\n",
2544                        rec->recmaster, pnn));
2545                 force_election(rec, pnn, nodemap);
2546                 return false;
2547         }
2548
2549         /* Verify that the master node has not been deleted.  This
2550          * should not happen because a node should always be shutdown
2551          * before being deleted, causing a new master to be elected
2552          * before now.  However, if something strange has happened
2553          * then checking here will ensure we don't index beyond the
2554          * end of the nodemap array. */
2555         if (rec->recmaster >= nodemap->num) {
2556                 DEBUG(DEBUG_ERR,
2557                       ("Recmaster node %u has been deleted. Force election\n",
2558                        rec->recmaster));
2559                 force_election(rec, pnn, nodemap);
2560                 return false;
2561         }
2562
2563         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2564         if (nodemap->nodes[rec->recmaster].flags &
2565             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2566                 DEBUG(DEBUG_NOTICE,
2567                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2568                        rec->recmaster));
2569                 force_election(rec, pnn, nodemap);
2570                 return false;
2571         }
2572
2573         /* get nodemap from the recovery master to check if it is inactive */
2574         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2575                                    mem_ctx, &recmaster_nodemap);
2576         if (ret != 0) {
2577                 DEBUG(DEBUG_ERR,
2578                       (__location__
2579                        " Unable to get nodemap from recovery master %u\n",
2580                           rec->recmaster));
2581                 /* No election, just error */
2582                 return false;
2583         }
2584
2585
2586         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2587             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2588                 DEBUG(DEBUG_NOTICE,
2589                       ("Recmaster node %u is inactive. Force election\n",
2590                        rec->recmaster));
2591                 /*
2592                  * update our nodemap to carry the recmaster's notion of
2593                  * its own flags, so that we don't keep freezing the
2594                  * inactive recmaster node...
2595                  */
2596                 nodemap->nodes[rec->recmaster].flags =
2597                         recmaster_nodemap->nodes[rec->recmaster].flags;
2598                 force_election(rec, pnn, nodemap);
2599                 return false;
2600         }
2601
2602         return true;
2603 }
2604
2605 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2606                       TALLOC_CTX *mem_ctx)
2607 {
2608         uint32_t pnn;
2609         struct ctdb_node_map_old *nodemap=NULL;
2610         struct ctdb_node_map_old **remote_nodemaps=NULL;
2611         struct ctdb_vnn_map *vnnmap=NULL;
2612         struct ctdb_vnn_map *remote_vnnmap=NULL;
2613         uint32_t num_lmasters;
2614         int32_t debug_level;
2615         int i, j, ret;
2616         bool self_ban;
2617
2618
2619         /* verify that the main daemon is still running */
2620         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2621                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2622                 exit(-1);
2623         }
2624
2625         /* ping the local daemon to tell it we are alive */
2626         ctdb_ctrl_recd_ping(ctdb);
2627
2628         if (rec->election_timeout) {
2629                 /* an election is in progress */
2630                 return;
2631         }
2632
2633         /* read the debug level from the parent and update locally */
2634         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2635         if (ret !=0) {
2636                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2637                 return;
2638         }
2639         debuglevel_set(debug_level);
2640
2641         /* get relevant tunables */
2642         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2643         if (ret != 0) {
2644                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2645                 return;
2646         }
2647
2648         /* get runstate */
2649         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2650                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2651         if (ret != 0) {
2652                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2653                 return;
2654         }
2655
2656         pnn = ctdb_get_pnn(ctdb);
2657
2658         /* get nodemap */
2659         TALLOC_FREE(rec->nodemap);
2660         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2661         if (ret != 0) {
2662                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2663                 return;
2664         }
2665         nodemap = rec->nodemap;
2666
2667         /* remember our own node flags */
2668         rec->node_flags = nodemap->nodes[pnn].flags;
2669
2670         ban_misbehaving_nodes(rec, &self_ban);
2671         if (self_ban) {
2672                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2673                 return;
2674         }
2675
2676         ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2677                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2678         if (ret != 0) {
2679                 D_ERR("Failed to read recmode from local node\n");
2680                 return;
2681         }
2682
2683         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2684            also frozen and that the recmode is set to active.
2685         */
2686         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2687                 /* If this node has become inactive then we want to
2688                  * reduce the chances of it taking over the recovery
2689                  * master role when it becomes active again.  This
2690                  * helps to stabilise the recovery master role so that
2691                  * it stays on the most stable node.
2692                  */
2693                 rec->priority_time = timeval_current();
2694
2695                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2696                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2697
2698                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2699                         if (ret != 0) {
2700                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2701
2702                                 return;
2703                         }
2704                 }
2705                 if (! rec->frozen_on_inactive) {
2706                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2707                                                CTDB_CURRENT_NODE);
2708                         if (ret != 0) {
2709                                 DEBUG(DEBUG_ERR,
2710                                       (__location__ " Failed to freeze node "
2711                                        "in STOPPED or BANNED state\n"));
2712                                 return;
2713                         }
2714
2715                         rec->frozen_on_inactive = true;
2716                 }
2717
2718                 /* If this node is stopped or banned then it is not the recovery
2719                  * master, so don't do anything. This prevents stopped or banned
2720                  * node from starting election and sending unnecessary controls.
2721                  */
2722                 return;
2723         }
2724
2725         rec->frozen_on_inactive = false;
2726
2727         /* Retrieve capabilities from all connected nodes */
2728         ret = update_capabilities(rec, nodemap);
2729         if (ret != 0) {
2730                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2731                 return;
2732         }
2733
2734         if (! validate_recovery_master(rec, mem_ctx)) {
2735                 return;
2736         }
2737
2738         if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2739                 /* Check if an IP takeover run is needed and trigger one if
2740                  * necessary */
2741                 verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2742         }
2743
2744         /* if we are not the recmaster then we do not need to check
2745            if recovery is needed
2746          */
2747         if (pnn != rec->recmaster) {
2748                 return;
2749         }
2750
2751
2752         /* ensure our local copies of flags are right */
2753         ret = update_local_flags(rec, nodemap);
2754         if (ret != 0) {
2755                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2756                 return;
2757         }
2758
2759         if (ctdb->num_nodes != nodemap->num) {
2760                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2761                 ctdb_load_nodes_file(ctdb);
2762                 return;
2763         }
2764
2765         /* verify that all active nodes agree that we are the recmaster */
2766         switch (verify_recmaster(rec, nodemap, pnn)) {
2767         case MONITOR_RECOVERY_NEEDED:
2768                 /* can not happen */
2769                 return;
2770         case MONITOR_ELECTION_NEEDED:
2771                 force_election(rec, pnn, nodemap);
2772                 return;
2773         case MONITOR_OK:
2774                 break;
2775         case MONITOR_FAILED:
2776                 return;
2777         }
2778
2779
2780         /* get the vnnmap */
2781         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2782         if (ret != 0) {
2783                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2784                 return;
2785         }
2786
2787         if (rec->need_recovery) {
2788                 /* a previous recovery didn't finish */
2789                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2790                 return;
2791         }
2792
2793         /* verify that all active nodes are in normal mode 
2794            and not in recovery mode 
2795         */
2796         switch (verify_recmode(ctdb, nodemap)) {
2797         case MONITOR_RECOVERY_NEEDED:
2798                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2799                 return;
2800         case MONITOR_FAILED:
2801                 return;
2802         case MONITOR_ELECTION_NEEDED:
2803                 /* can not happen */
2804         case MONITOR_OK:
2805                 break;
2806         }
2807
2808
2809         if (ctdb->recovery_lock != NULL) {
2810                 /* We must already hold the recovery lock */
2811                 if (!ctdb_recovery_have_lock(rec)) {
2812                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2813                         ctdb_set_culprit(rec, ctdb->pnn);
2814                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2815                         return;
2816                 }
2817         }
2818
2819
2820         /* If recoveries are disabled then there is no use doing any
2821          * nodemap or flags checks.  Recoveries might be disabled due
2822          * to "reloadnodes", so doing these checks might cause an
2823          * unnecessary recovery.  */
2824         if (ctdb_op_is_disabled(rec->recovery)) {
2825                 goto takeover_run_checks;
2826         }
2827
2828         /* get the nodemap for all active remote nodes
2829          */
2830         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
2831         if (remote_nodemaps == NULL) {
2832                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2833                 return;
2834         }
2835         for(i=0; i<nodemap->num; i++) {
2836                 remote_nodemaps[i] = NULL;
2837         }
2838         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2839                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2840                 return;
2841         } 
2842
2843         /* verify that all other nodes have the same nodemap as we have
2844         */
2845         for (j=0; j<nodemap->num; j++) {
2846                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2847                         continue;
2848                 }
2849
2850                 if (remote_nodemaps[j] == NULL) {
2851                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2852                         ctdb_set_culprit(rec, j);
2853
2854                         return;
2855                 }
2856
2857                 /* if the nodes disagree on how many nodes there are
2858                    then this is a good reason to try recovery
2859                  */
2860                 if (remote_nodemaps[j]->num != nodemap->num) {
2861                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2862                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2863                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2864                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2865                         return;
2866                 }
2867
2868                 /* if the nodes disagree on which nodes exist and are
2869                    active, then that is also a good reason to do recovery
2870                  */
2871                 for (i=0;i<nodemap->num;i++) {
2872                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2873                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2874                                           nodemap->nodes[j].pnn, i, 
2875                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2876                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2877                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2878                                             vnnmap);
2879                                 return;
2880                         }
2881                 }
2882         }
2883
2884         /*
2885          * Update node flags obtained from each active node. This ensure we have
2886          * up-to-date information for all the nodes.
2887          */
2888         for (j=0; j<nodemap->num; j++) {
2889                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2890                         continue;
2891                 }
2892                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2893         }
2894
2895         for (j=0; j<nodemap->num; j++) {
2896                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2897                         continue;
2898                 }
2899
2900                 /* verify the flags are consistent
2901                 */
2902                 for (i=0; i<nodemap->num; i++) {
2903                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2904                                 continue;
2905                         }
2906                         
2907                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2908                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2909                                   nodemap->nodes[j].pnn, 
2910                                   nodemap->nodes[i].pnn, 
2911                                   remote_nodemaps[j]->nodes[i].flags,
2912                                   nodemap->nodes[i].flags));
2913                                 if (i == j) {
2914                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2915                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2916                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2917                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2918                                                     vnnmap);
2919                                         return;
2920                                 } else {
2921                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2922                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2923                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2924                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2925                                                     vnnmap);
2926                                         return;
2927                                 }
2928                         }
2929                 }
2930         }
2931
2932
2933         /* count how many active nodes there are */
2934         num_lmasters  = 0;
2935         for (i=0; i<nodemap->num; i++) {
2936                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2937                         if (ctdb_node_has_capabilities(rec->caps,
2938                                                        ctdb->nodes[i]->pnn,
2939                                                        CTDB_CAP_LMASTER)) {
2940                                 num_lmasters++;
2941                         }
2942                 }
2943         }
2944
2945
2946         /* There must be the same number of lmasters in the vnn map as
2947          * there are active nodes with the lmaster capability...  or
2948          * do a recovery.
2949          */
2950         if (vnnmap->size != num_lmasters) {
2951                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2952                           vnnmap->size, num_lmasters));
2953                 ctdb_set_culprit(rec, ctdb->pnn);
2954                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2955                 return;
2956         }
2957
2958         /* verify that all active nodes in the nodemap also exist in 
2959            the vnnmap.
2960          */
2961         for (j=0; j<nodemap->num; j++) {
2962                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2963                         continue;
2964                 }
2965                 if (nodemap->nodes[j].pnn == pnn) {
2966                         continue;
2967                 }
2968
2969                 for (i=0; i<vnnmap->size; i++) {
2970                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2971                                 break;
2972                         }
2973                 }
2974                 if (i == vnnmap->size) {
2975                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2976                                   nodemap->nodes[j].pnn));
2977                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2978                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2979                         return;
2980                 }
2981         }
2982
2983         
2984         /* verify that all other nodes have the same vnnmap
2985            and are from the same generation
2986          */
2987         for (j=0; j<nodemap->num; j++) {
2988                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2989                         continue;
2990                 }
2991                 if (nodemap->nodes[j].pnn == pnn) {
2992                         continue;
2993                 }
2994
2995                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2996                                           mem_ctx, &remote_vnnmap);
2997                 if (ret != 0) {
2998                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2999                                   nodemap->nodes[j].pnn));
3000                         return;
3001                 }
3002
3003                 /* verify the vnnmap generation is the same */
3004                 if (vnnmap->generation != remote_vnnmap->generation) {
3005                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3006                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3007                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3008                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3009                         return;
3010                 }
3011
3012                 /* verify the vnnmap size is the same */
3013                 if (vnnmap->size != remote_vnnmap->size) {
3014                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3015                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3016                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3017                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3018                         return;
3019                 }
3020
3021                 /* verify the vnnmap is the same */
3022                 for (i=0;i<vnnmap->size;i++) {
3023                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3024                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3025                                           nodemap->nodes[j].pnn));
3026                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3027                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3028                                             vnnmap);
3029                                 return;
3030                         }
3031                 }
3032         }
3033
3034         /* FIXME: Add remote public IP checking to ensure that nodes
3035          * have the IP addresses that are allocated to them. */
3036
3037 takeover_run_checks:
3038
3039         /* If there are IP takeover runs requested or the previous one
3040          * failed then perform one and notify the waiters */
3041         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3042             (rec->reallocate_requests || rec->need_takeover_run)) {
3043                 process_ipreallocate_requests(ctdb, rec);
3044         }
3045 }
3046
3047 static void recd_sig_term_handler(struct tevent_context *ev,
3048                                   struct tevent_signal *se, int signum,
3049                                   int count, void *dont_care,
3050                                   void *private_data)
3051 {
3052         struct ctdb_recoverd *rec = talloc_get_type_abort(
3053                 private_data, struct ctdb_recoverd);
3054
3055         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
3056         ctdb_recovery_unlock(rec);
3057         exit(0);
3058 }
3059
3060
3061 /*
3062   the main monitoring loop
3063  */
3064 static void monitor_cluster(struct ctdb_context *ctdb)
3065 {
3066         struct tevent_signal *se;
3067         struct ctdb_recoverd *rec;
3068
3069         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3070
3071         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3072         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3073
3074         rec->ctdb = ctdb;
3075         rec->recmaster = CTDB_UNKNOWN_PNN;
3076         rec->recovery_lock_handle = NULL;
3077
3078         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3079         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3080
3081         rec->recovery = ctdb_op_init(rec, "recoveries");
3082         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3083
3084         rec->priority_time = timeval_current();
3085         rec->frozen_on_inactive = false;
3086
3087         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
3088                                recd_sig_term_handler, rec);
3089         if (se == NULL) {
3090                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
3091                 exit(1);
3092         }
3093
3094         /* register a message port for sending memory dumps */
3095         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3096
3097         /* when a node is assigned banning credits */
3098         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3099                                         banning_handler, rec);
3100
3101         /* register a message port for recovery elections */
3102         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3103
3104         /* when nodes are disabled/enabled */
3105         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3106
3107         /* when we are asked to puch out a flag change */
3108         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3109
3110         /* register a message port for vacuum fetch */
3111         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3112
3113         /* register a message port for reloadnodes  */
3114         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3115
3116         /* register a message port for performing a takeover run */
3117         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3118
3119         /* register a message port for disabling the ip check for a short while */
3120         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3121
3122         /* register a message port for forcing a rebalance of a node next
3123            reallocation */
3124         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3125
3126         /* Register a message port for disabling takeover runs */
3127         ctdb_client_set_message_handler(ctdb,
3128                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3129                                         disable_takeover_runs_handler, rec);
3130
3131         /* Register a message port for disabling recoveries */
3132         ctdb_client_set_message_handler(ctdb,
3133                                         CTDB_SRVID_DISABLE_RECOVERIES,
3134                                         disable_recoveries_handler, rec);
3135
3136         /* register a message port for detaching database */
3137         ctdb_client_set_message_handler(ctdb,
3138                                         CTDB_SRVID_DETACH_DATABASE,
3139                                         detach_database_handler, rec);
3140
3141         for (;;) {
3142                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3143                 struct timeval start;
3144                 double elapsed;
3145
3146                 if (!mem_ctx) {
3147                         DEBUG(DEBUG_CRIT,(__location__
3148                                           " Failed to create temp context\n"));
3149                         exit(-1);
3150                 }
3151
3152                 start = timeval_current();
3153                 main_loop(ctdb, rec, mem_ctx);
3154                 talloc_free(mem_ctx);
3155
3156                 /* we only check for recovery once every second */
3157                 elapsed = timeval_elapsed(&start);
3158                 if (elapsed < ctdb->tunable.recover_interval) {
3159                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3160                                           - elapsed);
3161                 }
3162         }
3163 }
3164
3165 /*
3166   event handler for when the main ctdbd dies
3167  */
3168 static void ctdb_recoverd_parent(struct tevent_context *ev,
3169                                  struct tevent_fd *fde,
3170                                  uint16_t flags, void *private_data)
3171 {
3172         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3173         _exit(1);
3174 }
3175
3176 /*
3177   called regularly to verify that the recovery daemon is still running
3178  */
3179 static void ctdb_check_recd(struct tevent_context *ev,
3180                             struct tevent_timer *te,
3181                             struct timeval yt, void *p)
3182 {
3183         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3184
3185         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3186                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3187
3188                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3189                                  ctdb_restart_recd, ctdb);
3190
3191                 return;
3192         }
3193
3194         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3195                          timeval_current_ofs(30, 0),
3196                          ctdb_check_recd, ctdb);
3197 }
3198
3199 static void recd_sig_child_handler(struct tevent_context *ev,
3200                                    struct tevent_signal *se, int signum,
3201                                    int count, void *dont_care,
3202                                    void *private_data)
3203 {
3204 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3205         int status;
3206         pid_t pid = -1;
3207
3208         while (pid != 0) {
3209                 pid = waitpid(-1, &status, WNOHANG);
3210                 if (pid == -1) {
3211                         if (errno != ECHILD) {
3212                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3213                         }
3214                         return;
3215                 }
3216                 if (pid > 0) {
3217                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3218                 }
3219         }
3220 }
3221
3222 /*
3223   startup the recovery daemon as a child of the main ctdb daemon
3224  */
3225 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3226 {
3227         int fd[2];
3228         struct tevent_signal *se;
3229         struct tevent_fd *fde;
3230         int ret;
3231
3232         if (pipe(fd) != 0) {
3233                 return -1;
3234         }
3235
3236         ctdb->recoverd_pid = ctdb_fork(ctdb);
3237         if (ctdb->recoverd_pid == -1) {
3238                 return -1;
3239         }
3240
3241         if (ctdb->recoverd_pid != 0) {
3242                 talloc_free(ctdb->recd_ctx);
3243                 ctdb->recd_ctx = talloc_new(ctdb);
3244                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3245
3246                 close(fd[0]);
3247                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3248                                  timeval_current_ofs(30, 0),
3249                                  ctdb_check_recd, ctdb);
3250                 return 0;
3251         }
3252
3253         close(fd[1]);
3254
3255         srandom(getpid() ^ time(NULL));
3256
3257         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3258         if (ret != 0) {
3259                 return -1;
3260         }
3261
3262         prctl_set_comment("ctdb_recoverd");
3263         if (switch_from_server_to_client(ctdb) != 0) {
3264                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3265                 exit(1);
3266         }
3267
3268         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3269
3270         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3271                             ctdb_recoverd_parent, &fd[0]);
3272         tevent_fd_set_auto_close(fde);
3273
3274         /* set up a handler to pick up sigchld */
3275         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3276                                recd_sig_child_handler, ctdb);
3277         if (se == NULL) {
3278                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3279                 exit(1);
3280         }
3281
3282         monitor_cluster(ctdb);
3283
3284         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3285         return -1;
3286 }
3287
3288 /*
3289   shutdown the recovery daemon
3290  */
3291 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3292 {
3293         if (ctdb->recoverd_pid == 0) {
3294                 return;
3295         }
3296
3297         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3298         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3299
3300         TALLOC_FREE(ctdb->recd_ctx);
3301         TALLOC_FREE(ctdb->recd_ping_count);
3302 }
3303
3304 static void ctdb_restart_recd(struct tevent_context *ev,
3305                               struct tevent_timer *te,
3306                               struct timeval t, void *private_data)
3307 {
3308         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3309
3310         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3311         ctdb_stop_recoverd(ctdb);
3312         ctdb_start_recoverd(ctdb);
3313 }