473e6cbe8cc8d3bf9c6beda4a8582a2d253d0a00
[herb/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system_socket.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "server/ctdb_config.h"
46
47 #include "ctdb_cluster_mutex.h"
48
49 /* List of SRVID requests that need to be processed */
50 struct srvid_list {
51         struct srvid_list *next, *prev;
52         struct ctdb_srvid_message *request;
53 };
54
55 struct srvid_requests {
56         struct srvid_list *requests;
57 };
58
59 static void srvid_request_reply(struct ctdb_context *ctdb,
60                                 struct ctdb_srvid_message *request,
61                                 TDB_DATA result)
62 {
63         /* Someone that sent srvid==0 does not want a reply */
64         if (request->srvid == 0) {
65                 talloc_free(request);
66                 return;
67         }
68
69         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
70                                      result) == 0) {
71                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
72                                   (unsigned)request->pnn,
73                                   (unsigned long long)request->srvid));
74         } else {
75                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
76                                  (unsigned)request->pnn,
77                                  (unsigned long long)request->srvid));
78         }
79
80         talloc_free(request);
81 }
82
83 static void srvid_requests_reply(struct ctdb_context *ctdb,
84                                  struct srvid_requests **requests,
85                                  TDB_DATA result)
86 {
87         struct srvid_list *r;
88
89         if (*requests == NULL) {
90                 return;
91         }
92
93         for (r = (*requests)->requests; r != NULL; r = r->next) {
94                 srvid_request_reply(ctdb, r->request, result);
95         }
96
97         /* Free the list structure... */
98         TALLOC_FREE(*requests);
99 }
100
101 static void srvid_request_add(struct ctdb_context *ctdb,
102                               struct srvid_requests **requests,
103                               struct ctdb_srvid_message *request)
104 {
105         struct srvid_list *t;
106         int32_t ret;
107         TDB_DATA result;
108
109         if (*requests == NULL) {
110                 *requests = talloc_zero(ctdb, struct srvid_requests);
111                 if (*requests == NULL) {
112                         goto nomem;
113                 }
114         }
115
116         t = talloc_zero(*requests, struct srvid_list);
117         if (t == NULL) {
118                 /* If *requests was just allocated above then free it */
119                 if ((*requests)->requests == NULL) {
120                         TALLOC_FREE(*requests);
121                 }
122                 goto nomem;
123         }
124
125         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
126         DLIST_ADD((*requests)->requests, t);
127
128         return;
129
130 nomem:
131         /* Failed to add the request to the list.  Send a fail. */
132         DEBUG(DEBUG_ERR, (__location__
133                           " Out of memory, failed to queue SRVID request\n"));
134         ret = -ENOMEM;
135         result.dsize = sizeof(ret);
136         result.dptr = (uint8_t *)&ret;
137         srvid_request_reply(ctdb, request, result);
138 }
139
140 /* An abstraction to allow an operation (takeover runs, recoveries,
141  * ...) to be disabled for a given timeout */
142 struct ctdb_op_state {
143         struct tevent_timer *timer;
144         bool in_progress;
145         const char *name;
146 };
147
148 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
149 {
150         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
151
152         if (state != NULL) {
153                 state->in_progress = false;
154                 state->name = name;
155         }
156
157         return state;
158 }
159
160 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
161 {
162         return state->timer != NULL;
163 }
164
165 static bool ctdb_op_begin(struct ctdb_op_state *state)
166 {
167         if (ctdb_op_is_disabled(state)) {
168                 DEBUG(DEBUG_NOTICE,
169                       ("Unable to begin - %s are disabled\n", state->name));
170                 return false;
171         }
172
173         state->in_progress = true;
174         return true;
175 }
176
177 static bool ctdb_op_end(struct ctdb_op_state *state)
178 {
179         return state->in_progress = false;
180 }
181
182 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
183 {
184         return state->in_progress;
185 }
186
187 static void ctdb_op_enable(struct ctdb_op_state *state)
188 {
189         TALLOC_FREE(state->timer);
190 }
191
192 static void ctdb_op_timeout_handler(struct tevent_context *ev,
193                                     struct tevent_timer *te,
194                                     struct timeval yt, void *p)
195 {
196         struct ctdb_op_state *state =
197                 talloc_get_type(p, struct ctdb_op_state);
198
199         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
200         ctdb_op_enable(state);
201 }
202
203 static int ctdb_op_disable(struct ctdb_op_state *state,
204                            struct tevent_context *ev,
205                            uint32_t timeout)
206 {
207         if (timeout == 0) {
208                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
209                 ctdb_op_enable(state);
210                 return 0;
211         }
212
213         if (state->in_progress) {
214                 DEBUG(DEBUG_ERR,
215                       ("Unable to disable %s - in progress\n", state->name));
216                 return -EAGAIN;
217         }
218
219         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
220                             state->name, timeout));
221
222         /* Clear any old timers */
223         talloc_free(state->timer);
224
225         /* Arrange for the timeout to occur */
226         state->timer = tevent_add_timer(ev, state,
227                                         timeval_current_ofs(timeout, 0),
228                                         ctdb_op_timeout_handler, state);
229         if (state->timer == NULL) {
230                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
231                 return -ENOMEM;
232         }
233
234         return 0;
235 }
236
237 struct ctdb_banning_state {
238         uint32_t count;
239         struct timeval last_reported_time;
240 };
241
242 struct ctdb_recovery_lock_handle;
243
244 /*
245   private state of recovery daemon
246  */
247 struct ctdb_recoverd {
248         struct ctdb_context *ctdb;
249         uint32_t recmaster;
250         uint32_t last_culprit_node;
251         struct ctdb_node_map_old *nodemap;
252         struct timeval priority_time;
253         bool need_takeover_run;
254         bool need_recovery;
255         uint32_t node_flags;
256         struct tevent_timer *send_election_te;
257         struct tevent_timer *election_timeout;
258         struct srvid_requests *reallocate_requests;
259         struct ctdb_op_state *takeover_run;
260         struct ctdb_op_state *recovery;
261         struct ctdb_iface_list_old *ifaces;
262         uint32_t *force_rebalance_nodes;
263         struct ctdb_node_capabilities *caps;
264         bool frozen_on_inactive;
265         struct ctdb_recovery_lock_handle *recovery_lock_handle;
266 };
267
268 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
269 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
270
271 static void ctdb_restart_recd(struct tevent_context *ev,
272                               struct tevent_timer *te, struct timeval t,
273                               void *private_data);
274
275 /*
276   ban a node for a period of time
277  */
278 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
279 {
280         int ret;
281         struct ctdb_context *ctdb = rec->ctdb;
282         struct ctdb_ban_state bantime;
283
284         if (!ctdb_validate_pnn(ctdb, pnn)) {
285                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
286                 return;
287         }
288
289         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
290
291         bantime.pnn  = pnn;
292         bantime.time = ban_time;
293
294         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
295         if (ret != 0) {
296                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
297                 return;
298         }
299
300 }
301
302 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
303
304
305 /*
306   remember the trouble maker
307  */
308 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
309 {
310         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
311         struct ctdb_banning_state *ban_state;
312
313         if (culprit > ctdb->num_nodes) {
314                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
315                 return;
316         }
317
318         /* If we are banned or stopped, do not set other nodes as culprits */
319         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
320                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
321                 return;
322         }
323
324         if (ctdb->nodes[culprit]->ban_state == NULL) {
325                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
326                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
327
328                 
329         }
330         ban_state = ctdb->nodes[culprit]->ban_state;
331         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
332                 /* this was the first time in a long while this node
333                    misbehaved so we will forgive any old transgressions.
334                 */
335                 ban_state->count = 0;
336         }
337
338         ban_state->count += count;
339         ban_state->last_reported_time = timeval_current();
340         rec->last_culprit_node = culprit;
341 }
342
343 /*
344   remember the trouble maker
345  */
346 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
347 {
348         ctdb_set_culprit_count(rec, culprit, 1);
349 }
350
351 /*
352   Retrieve capabilities from all connected nodes
353  */
354 static int update_capabilities(struct ctdb_recoverd *rec,
355                                struct ctdb_node_map_old *nodemap)
356 {
357         uint32_t *capp;
358         TALLOC_CTX *tmp_ctx;
359         struct ctdb_node_capabilities *caps;
360         struct ctdb_context *ctdb = rec->ctdb;
361
362         tmp_ctx = talloc_new(rec);
363         CTDB_NO_MEMORY(ctdb, tmp_ctx);
364
365         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
366                                      CONTROL_TIMEOUT(), nodemap);
367
368         if (caps == NULL) {
369                 DEBUG(DEBUG_ERR,
370                       (__location__ " Failed to get node capabilities\n"));
371                 talloc_free(tmp_ctx);
372                 return -1;
373         }
374
375         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
376         if (capp == NULL) {
377                 DEBUG(DEBUG_ERR,
378                       (__location__
379                        " Capabilities don't include current node.\n"));
380                 talloc_free(tmp_ctx);
381                 return -1;
382         }
383         ctdb->capabilities = *capp;
384
385         TALLOC_FREE(rec->caps);
386         rec->caps = talloc_steal(rec, caps);
387
388         talloc_free(tmp_ctx);
389         return 0;
390 }
391
392 /*
393   change recovery mode on all nodes
394  */
395 static int set_recovery_mode(struct ctdb_context *ctdb,
396                              struct ctdb_recoverd *rec,
397                              struct ctdb_node_map_old *nodemap,
398                              uint32_t rec_mode)
399 {
400         TDB_DATA data;
401         uint32_t *nodes;
402         TALLOC_CTX *tmp_ctx;
403
404         tmp_ctx = talloc_new(ctdb);
405         CTDB_NO_MEMORY(ctdb, tmp_ctx);
406
407         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
408
409         data.dsize = sizeof(uint32_t);
410         data.dptr = (unsigned char *)&rec_mode;
411
412         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
413                                         nodes, 0,
414                                         CONTROL_TIMEOUT(),
415                                         false, data,
416                                         NULL, NULL,
417                                         NULL) != 0) {
418                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
419                 talloc_free(tmp_ctx);
420                 return -1;
421         }
422
423         talloc_free(tmp_ctx);
424         return 0;
425 }
426
427 /*
428   ensure all other nodes have attached to any databases that we have
429  */
430 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
431                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
432 {
433         int i, j, db, ret;
434         struct ctdb_dbid_map_old *remote_dbmap;
435
436         /* verify that all other nodes have all our databases */
437         for (j=0; j<nodemap->num; j++) {
438                 /* we don't need to ourself ourselves */
439                 if (nodemap->nodes[j].pnn == pnn) {
440                         continue;
441                 }
442                 /* don't check nodes that are unavailable */
443                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
444                         continue;
445                 }
446
447                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
448                                          mem_ctx, &remote_dbmap);
449                 if (ret != 0) {
450                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
451                         return -1;
452                 }
453
454                 /* step through all local databases */
455                 for (db=0; db<dbmap->num;db++) {
456                         const char *name;
457
458
459                         for (i=0;i<remote_dbmap->num;i++) {
460                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
461                                         break;
462                                 }
463                         }
464                         /* the remote node already have this database */
465                         if (i!=remote_dbmap->num) {
466                                 continue;
467                         }
468                         /* ok so we need to create this database */
469                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
470                                                   dbmap->dbs[db].db_id, mem_ctx,
471                                                   &name);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
474                                 return -1;
475                         }
476                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
477                                                  nodemap->nodes[j].pnn,
478                                                  mem_ctx, name,
479                                                  dbmap->dbs[db].flags, NULL);
480                         if (ret != 0) {
481                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
482                                 return -1;
483                         }
484                 }
485         }
486
487         return 0;
488 }
489
490
491 /*
492   ensure we are attached to any databases that anyone else is attached to
493  */
494 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
495                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
496 {
497         int i, j, db, ret;
498         struct ctdb_dbid_map_old *remote_dbmap;
499
500         /* verify that we have all database any other node has */
501         for (j=0; j<nodemap->num; j++) {
502                 /* we don't need to ourself ourselves */
503                 if (nodemap->nodes[j].pnn == pnn) {
504                         continue;
505                 }
506                 /* don't check nodes that are unavailable */
507                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
508                         continue;
509                 }
510
511                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
512                                          mem_ctx, &remote_dbmap);
513                 if (ret != 0) {
514                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
515                         return -1;
516                 }
517
518                 /* step through all databases on the remote node */
519                 for (db=0; db<remote_dbmap->num;db++) {
520                         const char *name;
521
522                         for (i=0;i<(*dbmap)->num;i++) {
523                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
524                                         break;
525                                 }
526                         }
527                         /* we already have this db locally */
528                         if (i!=(*dbmap)->num) {
529                                 continue;
530                         }
531                         /* ok so we need to create this database and
532                            rebuild dbmap
533                          */
534                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
535                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
536                         if (ret != 0) {
537                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
538                                           nodemap->nodes[j].pnn));
539                                 return -1;
540                         }
541                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
542                                            mem_ctx, name,
543                                            remote_dbmap->dbs[db].flags, NULL);
544                         if (ret != 0) {
545                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
546                                 return -1;
547                         }
548                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
549                         if (ret != 0) {
550                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
551                                 return -1;
552                         }
553                 }
554         }
555
556         return 0;
557 }
558
559 /*
560   update flags on all active nodes
561  */
562 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
563 {
564         int ret;
565
566         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
567                 if (ret != 0) {
568                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
569                 return -1;
570         }
571
572         return 0;
573 }
574
575 /*
576   called when a vacuum fetch has completed - just free it and do the next one
577  */
578 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
579 {
580         talloc_free(state);
581 }
582
583
584 /**
585  * Process one elements of the vacuum fetch list:
586  * Migrate it over to us with the special flag
587  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
588  */
589 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
590                                      uint32_t pnn,
591                                      struct ctdb_rec_data_old *r)
592 {
593         struct ctdb_client_call_state *state;
594         TDB_DATA data;
595         struct ctdb_ltdb_header *hdr;
596         struct ctdb_call call;
597
598         ZERO_STRUCT(call);
599         call.call_id = CTDB_NULL_FUNC;
600         call.flags = CTDB_IMMEDIATE_MIGRATION;
601         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
602
603         call.key.dptr = &r->data[0];
604         call.key.dsize = r->keylen;
605
606         /* ensure we don't block this daemon - just skip a record if we can't get
607            the chainlock */
608         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
609                 return true;
610         }
611
612         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
613         if (data.dptr == NULL) {
614                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
615                 return true;
616         }
617
618         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
619                 free(data.dptr);
620                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
621                 return true;
622         }
623
624         hdr = (struct ctdb_ltdb_header *)data.dptr;
625         if (hdr->dmaster == pnn) {
626                 /* its already local */
627                 free(data.dptr);
628                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
629                 return true;
630         }
631
632         free(data.dptr);
633
634         state = ctdb_call_send(ctdb_db, &call);
635         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
636         if (state == NULL) {
637                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
638                 return false;
639         }
640         state->async.fn = vacuum_fetch_callback;
641         state->async.private_data = NULL;
642
643         return true;
644 }
645
646
647 /*
648   handler for vacuum fetch
649 */
650 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
651                                  void *private_data)
652 {
653         struct ctdb_recoverd *rec = talloc_get_type(
654                 private_data, struct ctdb_recoverd);
655         struct ctdb_context *ctdb = rec->ctdb;
656         struct ctdb_marshall_buffer *recs;
657         int ret, i;
658         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
659         const char *name;
660         struct ctdb_dbid_map_old *dbmap=NULL;
661         uint8_t db_flags = 0;
662         struct ctdb_db_context *ctdb_db;
663         struct ctdb_rec_data_old *r;
664
665         recs = (struct ctdb_marshall_buffer *)data.dptr;
666
667         if (recs->count == 0) {
668                 goto done;
669         }
670
671         /* work out if the database is persistent */
672         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
673         if (ret != 0) {
674                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
675                 goto done;
676         }
677
678         for (i=0;i<dbmap->num;i++) {
679                 if (dbmap->dbs[i].db_id == recs->db_id) {
680                         db_flags = dbmap->dbs[i].flags;
681                         break;
682                 }
683         }
684         if (i == dbmap->num) {
685                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
686                 goto done;
687         }
688
689         /* find the name of this database */
690         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
691                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
692                 goto done;
693         }
694
695         /* attach to it */
696         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
697         if (ctdb_db == NULL) {
698                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
699                 goto done;
700         }
701
702         r = (struct ctdb_rec_data_old *)&recs->data[0];
703         while (recs->count) {
704                 bool ok;
705
706                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
707                 if (!ok) {
708                         break;
709                 }
710
711                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
712                 recs->count--;
713         }
714
715 done:
716         talloc_free(tmp_ctx);
717 }
718
719
720 /*
721  * handler for database detach
722  */
723 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
724                                     void *private_data)
725 {
726         struct ctdb_recoverd *rec = talloc_get_type(
727                 private_data, struct ctdb_recoverd);
728         struct ctdb_context *ctdb = rec->ctdb;
729         uint32_t db_id;
730         struct ctdb_db_context *ctdb_db;
731
732         if (data.dsize != sizeof(db_id)) {
733                 return;
734         }
735         db_id = *(uint32_t *)data.dptr;
736
737         ctdb_db = find_ctdb_db(ctdb, db_id);
738         if (ctdb_db == NULL) {
739                 /* database is not attached */
740                 return;
741         }
742
743         DLIST_REMOVE(ctdb->db_list, ctdb_db);
744
745         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
746                              ctdb_db->db_name));
747         talloc_free(ctdb_db);
748 }
749
750 /*
751   called when ctdb_wait_timeout should finish
752  */
753 static void ctdb_wait_handler(struct tevent_context *ev,
754                               struct tevent_timer *te,
755                               struct timeval yt, void *p)
756 {
757         uint32_t *timed_out = (uint32_t *)p;
758         (*timed_out) = 1;
759 }
760
761 /*
762   wait for a given number of seconds
763  */
764 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
765 {
766         uint32_t timed_out = 0;
767         time_t usecs = (secs - (time_t)secs) * 1000000;
768         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
769                          ctdb_wait_handler, &timed_out);
770         while (!timed_out) {
771                 tevent_loop_once(ctdb->ev);
772         }
773 }
774
775 /*
776   called when an election times out (ends)
777  */
778 static void ctdb_election_timeout(struct tevent_context *ev,
779                                   struct tevent_timer *te,
780                                   struct timeval t, void *p)
781 {
782         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
783         rec->election_timeout = NULL;
784         fast_start = false;
785
786         DEBUG(DEBUG_WARNING,("Election period ended\n"));
787 }
788
789
790 /*
791   wait for an election to finish. It finished election_timeout seconds after
792   the last election packet is received
793  */
794 static void ctdb_wait_election(struct ctdb_recoverd *rec)
795 {
796         struct ctdb_context *ctdb = rec->ctdb;
797         while (rec->election_timeout) {
798                 tevent_loop_once(ctdb->ev);
799         }
800 }
801
802 /*
803   Update our local flags from all remote connected nodes. 
804   This is only run when we are or we belive we are the recovery master
805  */
806 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
807 {
808         int j;
809         struct ctdb_context *ctdb = rec->ctdb;
810         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
811
812         /* get the nodemap for all active remote nodes and verify
813            they are the same as for this node
814          */
815         for (j=0; j<nodemap->num; j++) {
816                 struct ctdb_node_map_old *remote_nodemap=NULL;
817                 int ret;
818
819                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
820                         continue;
821                 }
822                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
823                         continue;
824                 }
825
826                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
827                                            mem_ctx, &remote_nodemap);
828                 if (ret != 0) {
829                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
830                                   nodemap->nodes[j].pnn));
831                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
832                         talloc_free(mem_ctx);
833                         return -1;
834                 }
835                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
836                         /* We should tell our daemon about this so it
837                            updates its flags or else we will log the same 
838                            message again in the next iteration of recovery.
839                            Since we are the recovery master we can just as
840                            well update the flags on all nodes.
841                         */
842                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
843                         if (ret != 0) {
844                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
845                                 return -1;
846                         }
847
848                         /* Update our local copy of the flags in the recovery
849                            daemon.
850                         */
851                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
852                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
853                                  nodemap->nodes[j].flags));
854                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
855                 }
856                 talloc_free(remote_nodemap);
857         }
858         talloc_free(mem_ctx);
859         return 0;
860 }
861
862
863 /* Create a new random generation id.
864    The generation id can not be the INVALID_GENERATION id
865 */
866 static uint32_t new_generation(void)
867 {
868         uint32_t generation;
869
870         while (1) {
871                 generation = random();
872
873                 if (generation != INVALID_GENERATION) {
874                         break;
875                 }
876         }
877
878         return generation;
879 }
880
881 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
882 {
883         return (rec->recovery_lock_handle != NULL);
884 }
885
886 struct ctdb_recovery_lock_handle {
887         bool done;
888         bool locked;
889         double latency;
890         struct ctdb_cluster_mutex_handle *h;
891 };
892
893 static void take_reclock_handler(char status,
894                                  double latency,
895                                  void *private_data)
896 {
897         struct ctdb_recovery_lock_handle *s =
898                 (struct ctdb_recovery_lock_handle *) private_data;
899
900         s->locked = (status == '0') ;
901
902         /*
903          * If unsuccessful then ensure the process has exited and that
904          * the file descriptor event handler has been cancelled
905          */
906         if (! s->locked) {
907                 TALLOC_FREE(s->h);
908         }
909
910         switch (status) {
911         case '0':
912                 s->latency = latency;
913                 break;
914
915         case '1':
916                 D_ERR("Unable to take recovery lock - contention\n");
917                 break;
918
919         case '2':
920                 D_ERR("Unable to take recovery lock - timeout\n");
921                 break;
922
923         default:
924                 D_ERR("Unable to take recover lock - unknown error\n");
925         }
926
927         s->done = true;
928 }
929
930 static void force_election(struct ctdb_recoverd *rec,
931                            uint32_t pnn,
932                            struct ctdb_node_map_old *nodemap);
933
934 static void lost_reclock_handler(void *private_data)
935 {
936         struct ctdb_recoverd *rec = talloc_get_type_abort(
937                 private_data, struct ctdb_recoverd);
938
939         D_ERR("Recovery lock helper terminated, triggering an election\n");
940         TALLOC_FREE(rec->recovery_lock_handle);
941
942         force_election(rec, ctdb_get_pnn(rec->ctdb), rec->nodemap);
943 }
944
945 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
946 {
947         struct ctdb_context *ctdb = rec->ctdb;
948         struct ctdb_cluster_mutex_handle *h;
949         struct ctdb_recovery_lock_handle *s;
950
951         s = talloc_zero(rec, struct ctdb_recovery_lock_handle);
952         if (s == NULL) {
953                 DBG_ERR("Memory allocation error\n");
954                 return false;
955         };
956
957         h = ctdb_cluster_mutex(s,
958                                ctdb,
959                                ctdb->recovery_lock,
960                                0,
961                                take_reclock_handler,
962                                s,
963                                lost_reclock_handler,
964                                rec);
965         if (h == NULL) {
966                 talloc_free(s);
967                 return false;
968         }
969
970         rec->recovery_lock_handle = s;
971         s->h = h;
972
973         while (! s->done) {
974                 tevent_loop_once(ctdb->ev);
975         }
976
977         if (! s->locked) {
978                 TALLOC_FREE(rec->recovery_lock_handle);
979                 return false;
980         }
981
982         ctdb_ctrl_report_recd_lock_latency(ctdb,
983                                            CONTROL_TIMEOUT(),
984                                            s->latency);
985
986         return true;
987 }
988
989 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
990 {
991         if (rec->recovery_lock_handle == NULL) {
992                 return;
993         }
994
995         if (! rec->recovery_lock_handle->done) {
996                 /*
997                  * Taking of recovery lock still in progress.  Free
998                  * the cluster mutex handle to release it but leave
999                  * the recovery lock handle in place to allow taking
1000                  * of the lock to fail.
1001                  */
1002                 D_NOTICE("Cancelling recovery lock\n");
1003                 TALLOC_FREE(rec->recovery_lock_handle->h);
1004                 rec->recovery_lock_handle->done = true;
1005                 rec->recovery_lock_handle->locked = false;
1006                 return;
1007         }
1008
1009         D_NOTICE("Releasing recovery lock\n");
1010         TALLOC_FREE(rec->recovery_lock_handle);
1011 }
1012
1013 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1014 {
1015         struct ctdb_context *ctdb = rec->ctdb;
1016         int i;
1017         struct ctdb_banning_state *ban_state;
1018
1019         *self_ban = false;
1020         for (i=0; i<ctdb->num_nodes; i++) {
1021                 if (ctdb->nodes[i]->ban_state == NULL) {
1022                         continue;
1023                 }
1024                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1025                 if (ban_state->count < 2*ctdb->num_nodes) {
1026                         continue;
1027                 }
1028
1029                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1030                         ctdb->nodes[i]->pnn, ban_state->count,
1031                         ctdb->tunable.recovery_ban_period));
1032                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1033                 ban_state->count = 0;
1034
1035                 /* Banning ourself? */
1036                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1037                         *self_ban = true;
1038                 }
1039         }
1040 }
1041
1042 struct helper_state {
1043         int fd[2];
1044         pid_t pid;
1045         int result;
1046         bool done;
1047 };
1048
1049 static void helper_handler(struct tevent_context *ev,
1050                            struct tevent_fd *fde,
1051                            uint16_t flags, void *private_data)
1052 {
1053         struct helper_state *state = talloc_get_type_abort(
1054                 private_data, struct helper_state);
1055         int ret;
1056
1057         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1058         if (ret != sizeof(state->result)) {
1059                 state->result = EPIPE;
1060         }
1061
1062         state->done = true;
1063 }
1064
1065 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1066                       const char *prog, const char *arg, const char *type)
1067 {
1068         struct helper_state *state;
1069         struct tevent_fd *fde;
1070         const char **args;
1071         int nargs, ret;
1072         uint32_t recmaster = rec->recmaster;
1073
1074         state = talloc_zero(mem_ctx, struct helper_state);
1075         if (state == NULL) {
1076                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1077                 return -1;
1078         }
1079
1080         state->pid = -1;
1081
1082         ret = pipe(state->fd);
1083         if (ret != 0) {
1084                 DEBUG(DEBUG_ERR,
1085                       ("Failed to create pipe for %s helper\n", type));
1086                 goto fail;
1087         }
1088
1089         set_close_on_exec(state->fd[0]);
1090
1091         nargs = 4;
1092         args = talloc_array(state, const char *, nargs);
1093         if (args == NULL) {
1094                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1095                 goto fail;
1096         }
1097
1098         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1099         if (args[0] == NULL) {
1100                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1101                 goto fail;
1102         }
1103         args[1] = rec->ctdb->daemon.name;
1104         args[2] = arg;
1105         args[3] = NULL;
1106
1107         if (args[2] == NULL) {
1108                 nargs = 3;
1109         }
1110
1111         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
1112         if (state->pid == -1) {
1113                 DEBUG(DEBUG_ERR,
1114                       ("Failed to create child for %s helper\n", type));
1115                 goto fail;
1116         }
1117
1118         close(state->fd[1]);
1119         state->fd[1] = -1;
1120
1121         state->done = false;
1122
1123         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1124                             TEVENT_FD_READ, helper_handler, state);
1125         if (fde == NULL) {
1126                 goto fail;
1127         }
1128         tevent_fd_set_auto_close(fde);
1129
1130         while (!state->done) {
1131                 tevent_loop_once(rec->ctdb->ev);
1132
1133                 /* If recmaster changes, we have lost election */
1134                 if (recmaster != rec->recmaster) {
1135                         D_ERR("Recmaster changed to %u, aborting %s\n",
1136                               rec->recmaster, type);
1137                         state->result = 1;
1138                         break;
1139                 }
1140         }
1141
1142         close(state->fd[0]);
1143         state->fd[0] = -1;
1144
1145         if (state->result != 0) {
1146                 goto fail;
1147         }
1148
1149         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1150         talloc_free(state);
1151         return 0;
1152
1153 fail:
1154         if (state->fd[0] != -1) {
1155                 close(state->fd[0]);
1156         }
1157         if (state->fd[1] != -1) {
1158                 close(state->fd[1]);
1159         }
1160         if (state->pid != -1) {
1161                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1162         }
1163         talloc_free(state);
1164         return -1;
1165 }
1166
1167
1168 static int ctdb_takeover(struct ctdb_recoverd *rec,
1169                          uint32_t *force_rebalance_nodes)
1170 {
1171         static char prog[PATH_MAX+1] = "";
1172         char *arg;
1173         int i, ret;
1174
1175         if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
1176                              "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
1177                              "ctdb_takeover_helper")) {
1178                 ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
1179         }
1180
1181         arg = NULL;
1182         for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
1183                 uint32_t pnn = force_rebalance_nodes[i];
1184                 if (arg == NULL) {
1185                         arg = talloc_asprintf(rec, "%u", pnn);
1186                 } else {
1187                         arg = talloc_asprintf_append(arg, ",%u", pnn);
1188                 }
1189                 if (arg == NULL) {
1190                         DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1191                         return -1;
1192                 }
1193         }
1194
1195         if (ctdb_config.failover_disabled) {
1196                 ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
1197                 if (ret != 0) {
1198                         D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
1199                         return -1;
1200                 }
1201         }
1202
1203         return helper_run(rec, rec, prog, arg, "takeover");
1204 }
1205
1206 static bool do_takeover_run(struct ctdb_recoverd *rec,
1207                             struct ctdb_node_map_old *nodemap)
1208 {
1209         uint32_t *nodes = NULL;
1210         struct ctdb_disable_message dtr;
1211         TDB_DATA data;
1212         int i;
1213         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1214         int ret;
1215         bool ok;
1216
1217         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1218
1219         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1220                 DEBUG(DEBUG_ERR, (__location__
1221                                   " takeover run already in progress \n"));
1222                 ok = false;
1223                 goto done;
1224         }
1225
1226         if (!ctdb_op_begin(rec->takeover_run)) {
1227                 ok = false;
1228                 goto done;
1229         }
1230
1231         /* Disable IP checks (takeover runs, really) on other nodes
1232          * while doing this takeover run.  This will stop those other
1233          * nodes from triggering takeover runs when think they should
1234          * be hosting an IP but it isn't yet on an interface.  Don't
1235          * wait for replies since a failure here might cause some
1236          * noise in the logs but will not actually cause a problem.
1237          */
1238         ZERO_STRUCT(dtr);
1239         dtr.srvid = 0; /* No reply */
1240         dtr.pnn = -1;
1241
1242         data.dptr  = (uint8_t*)&dtr;
1243         data.dsize = sizeof(dtr);
1244
1245         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1246
1247         /* Disable for 60 seconds.  This can be a tunable later if
1248          * necessary.
1249          */
1250         dtr.timeout = 60;
1251         for (i = 0; i < talloc_array_length(nodes); i++) {
1252                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1253                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1254                                              data) != 0) {
1255                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1256                 }
1257         }
1258
1259         ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
1260
1261         /* Reenable takeover runs and IP checks on other nodes */
1262         dtr.timeout = 0;
1263         for (i = 0; i < talloc_array_length(nodes); i++) {
1264                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1265                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1266                                              data) != 0) {
1267                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1268                 }
1269         }
1270
1271         if (ret != 0) {
1272                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1273                 ok = false;
1274                 goto done;
1275         }
1276
1277         ok = true;
1278         /* Takeover run was successful so clear force rebalance targets */
1279         if (rebalance_nodes == rec->force_rebalance_nodes) {
1280                 TALLOC_FREE(rec->force_rebalance_nodes);
1281         } else {
1282                 DEBUG(DEBUG_WARNING,
1283                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1284         }
1285 done:
1286         rec->need_takeover_run = !ok;
1287         talloc_free(nodes);
1288         ctdb_op_end(rec->takeover_run);
1289
1290         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1291         return ok;
1292 }
1293
1294 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1295 {
1296         static char prog[PATH_MAX+1] = "";
1297         const char *arg;
1298
1299         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1300                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1301                              "ctdb_recovery_helper")) {
1302                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1303         }
1304
1305         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1306         if (arg == NULL) {
1307                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1308                 return -1;
1309         }
1310
1311         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1312
1313         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1314 }
1315
1316 /*
1317   we are the recmaster, and recovery is needed - start a recovery run
1318  */
1319 static int do_recovery(struct ctdb_recoverd *rec,
1320                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1321                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1322 {
1323         struct ctdb_context *ctdb = rec->ctdb;
1324         int i, ret;
1325         struct ctdb_dbid_map_old *dbmap;
1326         bool self_ban;
1327
1328         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1329
1330         /* Check if the current node is still the recmaster.  It's possible that
1331          * re-election has changed the recmaster.
1332          */
1333         if (pnn != rec->recmaster) {
1334                 DEBUG(DEBUG_NOTICE,
1335                       ("Recovery master changed to %u, aborting recovery\n",
1336                        rec->recmaster));
1337                 return -1;
1338         }
1339
1340         /* if recovery fails, force it again */
1341         rec->need_recovery = true;
1342
1343         if (!ctdb_op_begin(rec->recovery)) {
1344                 return -1;
1345         }
1346
1347         if (rec->election_timeout) {
1348                 /* an election is in progress */
1349                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1350                 goto fail;
1351         }
1352
1353         ban_misbehaving_nodes(rec, &self_ban);
1354         if (self_ban) {
1355                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1356                 goto fail;
1357         }
1358
1359         if (ctdb->recovery_lock != NULL) {
1360                 if (ctdb_recovery_have_lock(rec)) {
1361                         D_NOTICE("Already holding recovery lock\n");
1362                 } else {
1363                         bool ok;
1364
1365                         D_NOTICE("Attempting to take recovery lock (%s)\n",
1366                                  ctdb->recovery_lock);
1367
1368                         ok = ctdb_recovery_lock(rec);
1369                         if (! ok) {
1370                                 D_ERR("Unable to take recovery lock\n");
1371
1372                                 if (pnn != rec->recmaster) {
1373                                         D_NOTICE("Recovery master changed to %u,"
1374                                                  " aborting recovery\n",
1375                                                  rec->recmaster);
1376                                         rec->need_recovery = false;
1377                                         goto fail;
1378                                 }
1379
1380                                 if (ctdb->runstate ==
1381                                     CTDB_RUNSTATE_FIRST_RECOVERY) {
1382                                         /*
1383                                          * First recovery?  Perhaps
1384                                          * current node does not yet
1385                                          * know who the recmaster is.
1386                                          */
1387                                         D_ERR("Retrying recovery\n");
1388                                         goto fail;
1389                                 }
1390
1391                                 D_ERR("Abort recovery, "
1392                                       "ban this node for %u seconds\n",
1393                                       ctdb->tunable.recovery_ban_period);
1394                                 ctdb_ban_node(rec,
1395                                               pnn,
1396                                               ctdb->tunable.recovery_ban_period);
1397                                 goto fail;
1398                         }
1399                         D_NOTICE("Recovery lock taken successfully\n");
1400                 }
1401         }
1402
1403         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1404
1405         /* get a list of all databases */
1406         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1407         if (ret != 0) {
1408                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1409                 goto fail;
1410         }
1411
1412         /* we do the db creation before we set the recovery mode, so the freeze happens
1413            on all databases we will be dealing with. */
1414
1415         /* verify that we have all the databases any other node has */
1416         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1417         if (ret != 0) {
1418                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1419                 goto fail;
1420         }
1421
1422         /* verify that all other nodes have all our databases */
1423         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1424         if (ret != 0) {
1425                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1426                 goto fail;
1427         }
1428         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1429
1430
1431         /* Retrieve capabilities from all connected nodes */
1432         ret = update_capabilities(rec, nodemap);
1433         if (ret!=0) {
1434                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1435                 return -1;
1436         }
1437
1438         /*
1439           update all nodes to have the same flags that we have
1440          */
1441         for (i=0;i<nodemap->num;i++) {
1442                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1443                         continue;
1444                 }
1445
1446                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1447                 if (ret != 0) {
1448                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1449                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1450                         } else {
1451                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1452                                 return -1;
1453                         }
1454                 }
1455         }
1456
1457         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1458
1459         ret = db_recovery_parallel(rec, mem_ctx);
1460         if (ret != 0) {
1461                 goto fail;
1462         }
1463
1464         do_takeover_run(rec, nodemap);
1465
1466         /* send a message to all clients telling them that the cluster 
1467            has been reconfigured */
1468         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1469                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1470         if (ret != 0) {
1471                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1472                 goto fail;
1473         }
1474
1475         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1476
1477         rec->need_recovery = false;
1478         ctdb_op_end(rec->recovery);
1479
1480         /* we managed to complete a full recovery, make sure to forgive
1481            any past sins by the nodes that could now participate in the
1482            recovery.
1483         */
1484         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1485         for (i=0;i<nodemap->num;i++) {
1486                 struct ctdb_banning_state *ban_state;
1487
1488                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1489                         continue;
1490                 }
1491
1492                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1493                 if (ban_state == NULL) {
1494                         continue;
1495                 }
1496
1497                 ban_state->count = 0;
1498         }
1499
1500         /* We just finished a recovery successfully.
1501            We now wait for rerecovery_timeout before we allow
1502            another recovery to take place.
1503         */
1504         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1505         ctdb_op_disable(rec->recovery, ctdb->ev,
1506                         ctdb->tunable.rerecovery_timeout);
1507         return 0;
1508
1509 fail:
1510         ctdb_op_end(rec->recovery);
1511         return -1;
1512 }
1513
1514
1515 /*
1516   elections are won by first checking the number of connected nodes, then
1517   the priority time, then the pnn
1518  */
1519 struct election_message {
1520         uint32_t num_connected;
1521         struct timeval priority_time;
1522         uint32_t pnn;
1523         uint32_t node_flags;
1524 };
1525
1526 /*
1527   form this nodes election data
1528  */
1529 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1530 {
1531         int ret, i;
1532         struct ctdb_node_map_old *nodemap;
1533         struct ctdb_context *ctdb = rec->ctdb;
1534
1535         ZERO_STRUCTP(em);
1536
1537         em->pnn = rec->ctdb->pnn;
1538         em->priority_time = rec->priority_time;
1539
1540         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1541         if (ret != 0) {
1542                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1543                 return;
1544         }
1545
1546         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1547         em->node_flags = rec->node_flags;
1548
1549         for (i=0;i<nodemap->num;i++) {
1550                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1551                         em->num_connected++;
1552                 }
1553         }
1554
1555         /* we shouldnt try to win this election if we cant be a recmaster */
1556         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1557                 em->num_connected = 0;
1558                 em->priority_time = timeval_current();
1559         }
1560
1561         talloc_free(nodemap);
1562 }
1563
1564 /*
1565   see if the given election data wins
1566  */
1567 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1568 {
1569         struct election_message myem;
1570         int cmp = 0;
1571
1572         ctdb_election_data(rec, &myem);
1573
1574         /* we cant win if we don't have the recmaster capability */
1575         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1576                 return false;
1577         }
1578
1579         /* we cant win if we are banned */
1580         if (rec->node_flags & NODE_FLAGS_BANNED) {
1581                 return false;
1582         }
1583
1584         /* we cant win if we are stopped */
1585         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1586                 return false;
1587         }
1588
1589         /* we will automatically win if the other node is banned */
1590         if (em->node_flags & NODE_FLAGS_BANNED) {
1591                 return true;
1592         }
1593
1594         /* we will automatically win if the other node is banned */
1595         if (em->node_flags & NODE_FLAGS_STOPPED) {
1596                 return true;
1597         }
1598
1599         /* then the longest running node */
1600         if (cmp == 0) {
1601                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1602         }
1603
1604         if (cmp == 0) {
1605                 cmp = (int)myem.pnn - (int)em->pnn;
1606         }
1607
1608         return cmp > 0;
1609 }
1610
1611 /*
1612   send out an election request
1613  */
1614 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1615 {
1616         int ret;
1617         TDB_DATA election_data;
1618         struct election_message emsg;
1619         uint64_t srvid;
1620         struct ctdb_context *ctdb = rec->ctdb;
1621
1622         srvid = CTDB_SRVID_ELECTION;
1623
1624         ctdb_election_data(rec, &emsg);
1625
1626         election_data.dsize = sizeof(struct election_message);
1627         election_data.dptr  = (unsigned char *)&emsg;
1628
1629
1630         /* first we assume we will win the election and set 
1631            recoverymaster to be ourself on the current node
1632          */
1633         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1634                                      CTDB_CURRENT_NODE, pnn);
1635         if (ret != 0) {
1636                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1637                 return -1;
1638         }
1639         rec->recmaster = pnn;
1640
1641         /* send an election message to all active nodes */
1642         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1643         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1644 }
1645
1646 /*
1647   we think we are winning the election - send a broadcast election request
1648  */
1649 static void election_send_request(struct tevent_context *ev,
1650                                   struct tevent_timer *te,
1651                                   struct timeval t, void *p)
1652 {
1653         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1654         int ret;
1655
1656         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1657         if (ret != 0) {
1658                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1659         }
1660
1661         TALLOC_FREE(rec->send_election_te);
1662 }
1663
1664 /*
1665   handler for memory dumps
1666 */
1667 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1668 {
1669         struct ctdb_recoverd *rec = talloc_get_type(
1670                 private_data, struct ctdb_recoverd);
1671         struct ctdb_context *ctdb = rec->ctdb;
1672         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1673         TDB_DATA *dump;
1674         int ret;
1675         struct ctdb_srvid_message *rd;
1676
1677         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1678                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1679                 talloc_free(tmp_ctx);
1680                 return;
1681         }
1682         rd = (struct ctdb_srvid_message *)data.dptr;
1683
1684         dump = talloc_zero(tmp_ctx, TDB_DATA);
1685         if (dump == NULL) {
1686                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1687                 talloc_free(tmp_ctx);
1688                 return;
1689         }
1690         ret = ctdb_dump_memory(ctdb, dump);
1691         if (ret != 0) {
1692                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1693                 talloc_free(tmp_ctx);
1694                 return;
1695         }
1696
1697 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1698
1699         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1700         if (ret != 0) {
1701                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1702                 talloc_free(tmp_ctx);
1703                 return;
1704         }
1705
1706         talloc_free(tmp_ctx);
1707 }
1708
1709 /*
1710   handler for reload_nodes
1711 */
1712 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1713                                  void *private_data)
1714 {
1715         struct ctdb_recoverd *rec = talloc_get_type(
1716                 private_data, struct ctdb_recoverd);
1717
1718         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1719
1720         ctdb_load_nodes_file(rec->ctdb);
1721 }
1722
1723
1724 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1725                                         void *private_data)
1726 {
1727         struct ctdb_recoverd *rec = talloc_get_type(
1728                 private_data, struct ctdb_recoverd);
1729         struct ctdb_context *ctdb = rec->ctdb;
1730         uint32_t pnn;
1731         uint32_t *t;
1732         int len;
1733
1734         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1735                 return;
1736         }
1737
1738         if (data.dsize != sizeof(uint32_t)) {
1739                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1740                 return;
1741         }
1742
1743         pnn = *(uint32_t *)&data.dptr[0];
1744
1745         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1746
1747         /* Copy any existing list of nodes.  There's probably some
1748          * sort of realloc variant that will do this but we need to
1749          * make sure that freeing the old array also cancels the timer
1750          * event for the timeout... not sure if realloc will do that.
1751          */
1752         len = (rec->force_rebalance_nodes != NULL) ?
1753                 talloc_array_length(rec->force_rebalance_nodes) :
1754                 0;
1755
1756         /* This allows duplicates to be added but they don't cause
1757          * harm.  A call to add a duplicate PNN arguably means that
1758          * the timeout should be reset, so this is the simplest
1759          * solution.
1760          */
1761         t = talloc_zero_array(rec, uint32_t, len+1);
1762         CTDB_NO_MEMORY_VOID(ctdb, t);
1763         if (len > 0) {
1764                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1765         }
1766         t[len] = pnn;
1767
1768         talloc_free(rec->force_rebalance_nodes);
1769
1770         rec->force_rebalance_nodes = t;
1771 }
1772
1773
1774
1775 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1776                                     TDB_DATA data,
1777                                     struct ctdb_op_state *op_state)
1778 {
1779         struct ctdb_disable_message *r;
1780         uint32_t timeout;
1781         TDB_DATA result;
1782         int32_t ret = 0;
1783
1784         /* Validate input data */
1785         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1786                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1787                                  "expecting %lu\n", (long unsigned)data.dsize,
1788                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1789                 return;
1790         }
1791         if (data.dptr == NULL) {
1792                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1793                 return;
1794         }
1795
1796         r = (struct ctdb_disable_message *)data.dptr;
1797         timeout = r->timeout;
1798
1799         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1800         if (ret != 0) {
1801                 goto done;
1802         }
1803
1804         /* Returning our PNN tells the caller that we succeeded */
1805         ret = ctdb_get_pnn(ctdb);
1806 done:
1807         result.dsize = sizeof(int32_t);
1808         result.dptr  = (uint8_t *)&ret;
1809         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1810 }
1811
1812 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1813                                           void *private_data)
1814 {
1815         struct ctdb_recoverd *rec = talloc_get_type(
1816                 private_data, struct ctdb_recoverd);
1817
1818         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1819 }
1820
1821 /* Backward compatibility for this SRVID */
1822 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1823                                      void *private_data)
1824 {
1825         struct ctdb_recoverd *rec = talloc_get_type(
1826                 private_data, struct ctdb_recoverd);
1827         uint32_t timeout;
1828
1829         if (data.dsize != sizeof(uint32_t)) {
1830                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1831                                  "expecting %lu\n", (long unsigned)data.dsize,
1832                                  (long unsigned)sizeof(uint32_t)));
1833                 return;
1834         }
1835         if (data.dptr == NULL) {
1836                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1837                 return;
1838         }
1839
1840         timeout = *((uint32_t *)data.dptr);
1841
1842         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1843 }
1844
1845 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1846                                        void *private_data)
1847 {
1848         struct ctdb_recoverd *rec = talloc_get_type(
1849                 private_data, struct ctdb_recoverd);
1850
1851         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1852 }
1853
1854 /*
1855   handler for ip reallocate, just add it to the list of requests and 
1856   handle this later in the monitor_cluster loop so we do not recurse
1857   with other requests to takeover_run()
1858 */
1859 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1860                                   void *private_data)
1861 {
1862         struct ctdb_srvid_message *request;
1863         struct ctdb_recoverd *rec = talloc_get_type(
1864                 private_data, struct ctdb_recoverd);
1865
1866         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1867                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1868                 return;
1869         }
1870
1871         request = (struct ctdb_srvid_message *)data.dptr;
1872
1873         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1874 }
1875
1876 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1877                                           struct ctdb_recoverd *rec)
1878 {
1879         TDB_DATA result;
1880         int32_t ret;
1881         struct srvid_requests *current;
1882
1883         /* Only process requests that are currently pending.  More
1884          * might come in while the takeover run is in progress and
1885          * they will need to be processed later since they might
1886          * be in response flag changes.
1887          */
1888         current = rec->reallocate_requests;
1889         rec->reallocate_requests = NULL;
1890
1891         if (do_takeover_run(rec, rec->nodemap)) {
1892                 ret = ctdb_get_pnn(ctdb);
1893         } else {
1894                 ret = -1;
1895         }
1896
1897         result.dsize = sizeof(int32_t);
1898         result.dptr  = (uint8_t *)&ret;
1899
1900         srvid_requests_reply(ctdb, &current, result);
1901 }
1902
1903 /*
1904  * handler for assigning banning credits
1905  */
1906 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1907 {
1908         struct ctdb_recoverd *rec = talloc_get_type(
1909                 private_data, struct ctdb_recoverd);
1910         uint32_t ban_pnn;
1911
1912         /* Ignore if we are not recmaster */
1913         if (rec->ctdb->pnn != rec->recmaster) {
1914                 return;
1915         }
1916
1917         if (data.dsize != sizeof(uint32_t)) {
1918                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1919                                   data.dsize));
1920                 return;
1921         }
1922
1923         ban_pnn = *(uint32_t *)data.dptr;
1924
1925         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1926 }
1927
1928 /*
1929   handler for recovery master elections
1930 */
1931 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1932 {
1933         struct ctdb_recoverd *rec = talloc_get_type(
1934                 private_data, struct ctdb_recoverd);
1935         struct ctdb_context *ctdb = rec->ctdb;
1936         int ret;
1937         struct election_message *em = (struct election_message *)data.dptr;
1938
1939         /* Ignore election packets from ourself */
1940         if (ctdb->pnn == em->pnn) {
1941                 return;
1942         }
1943
1944         /* we got an election packet - update the timeout for the election */
1945         talloc_free(rec->election_timeout);
1946         rec->election_timeout = tevent_add_timer(
1947                         ctdb->ev, ctdb,
1948                         fast_start ?
1949                                 timeval_current_ofs(0, 500000) :
1950                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1951                         ctdb_election_timeout, rec);
1952
1953         /* someone called an election. check their election data
1954            and if we disagree and we would rather be the elected node, 
1955            send a new election message to all other nodes
1956          */
1957         if (ctdb_election_win(rec, em)) {
1958                 if (!rec->send_election_te) {
1959                         rec->send_election_te = tevent_add_timer(
1960                                         ctdb->ev, rec,
1961                                         timeval_current_ofs(0, 500000),
1962                                         election_send_request, rec);
1963                 }
1964                 return;
1965         }
1966
1967         /* we didn't win */
1968         TALLOC_FREE(rec->send_election_te);
1969
1970         /* Release the recovery lock file */
1971         if (ctdb_recovery_have_lock(rec)) {
1972                 ctdb_recovery_unlock(rec);
1973         }
1974
1975         /* ok, let that guy become recmaster then */
1976         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1977                                      CTDB_CURRENT_NODE, em->pnn);
1978         if (ret != 0) {
1979                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1980                 return;
1981         }
1982         rec->recmaster = em->pnn;
1983
1984         return;
1985 }
1986
1987
1988 /*
1989   force the start of the election process
1990  */
1991 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1992                            struct ctdb_node_map_old *nodemap)
1993 {
1994         int ret;
1995         struct ctdb_context *ctdb = rec->ctdb;
1996
1997         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1998
1999         /* set all nodes to recovery mode to stop all internode traffic */
2000         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2001         if (ret != 0) {
2002                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2003                 return;
2004         }
2005
2006         talloc_free(rec->election_timeout);
2007         rec->election_timeout = tevent_add_timer(
2008                         ctdb->ev, ctdb,
2009                         fast_start ?
2010                                 timeval_current_ofs(0, 500000) :
2011                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2012                         ctdb_election_timeout, rec);
2013
2014         ret = send_election_request(rec, pnn);
2015         if (ret!=0) {
2016                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2017                 return;
2018         }
2019
2020         /* wait for a few seconds to collect all responses */
2021         ctdb_wait_election(rec);
2022 }
2023
2024
2025
2026 /*
2027   handler for when a node changes its flags
2028 */
2029 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2030 {
2031         struct ctdb_recoverd *rec = talloc_get_type(
2032                 private_data, struct ctdb_recoverd);
2033         struct ctdb_context *ctdb = rec->ctdb;
2034         int ret;
2035         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2036         struct ctdb_node_map_old *nodemap=NULL;
2037         TALLOC_CTX *tmp_ctx;
2038         int i;
2039
2040         if (data.dsize != sizeof(*c)) {
2041                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2042                 return;
2043         }
2044
2045         tmp_ctx = talloc_new(ctdb);
2046         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2047
2048         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2049         if (ret != 0) {
2050                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2051                 talloc_free(tmp_ctx);
2052                 return;         
2053         }
2054
2055
2056         for (i=0;i<nodemap->num;i++) {
2057                 if (nodemap->nodes[i].pnn == c->pnn) break;
2058         }
2059
2060         if (i == nodemap->num) {
2061                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2062                 talloc_free(tmp_ctx);
2063                 return;
2064         }
2065
2066         if (c->old_flags != c->new_flags) {
2067                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2068         }
2069
2070         nodemap->nodes[i].flags = c->new_flags;
2071
2072         talloc_free(tmp_ctx);
2073 }
2074
2075 /*
2076   handler for when we need to push out flag changes ot all other nodes
2077 */
2078 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2079                                void *private_data)
2080 {
2081         struct ctdb_recoverd *rec = talloc_get_type(
2082                 private_data, struct ctdb_recoverd);
2083         struct ctdb_context *ctdb = rec->ctdb;
2084         int ret;
2085         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2086         struct ctdb_node_map_old *nodemap=NULL;
2087         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2088         uint32_t *nodes;
2089
2090         /* read the node flags from the recmaster */
2091         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2092                                    tmp_ctx, &nodemap);
2093         if (ret != 0) {
2094                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2095                 talloc_free(tmp_ctx);
2096                 return;
2097         }
2098         if (c->pnn >= nodemap->num) {
2099                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2100                 talloc_free(tmp_ctx);
2101                 return;
2102         }
2103
2104         /* send the flags update to all connected nodes */
2105         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2106
2107         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2108                                       nodes, 0, CONTROL_TIMEOUT(),
2109                                       false, data,
2110                                       NULL, NULL,
2111                                       NULL) != 0) {
2112                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2113
2114                 talloc_free(tmp_ctx);
2115                 return;
2116         }
2117
2118         talloc_free(tmp_ctx);
2119 }
2120
2121
2122 struct verify_recmode_normal_data {
2123         uint32_t count;
2124         enum monitor_result status;
2125 };
2126
2127 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2128 {
2129         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2130
2131
2132         /* one more node has responded with recmode data*/
2133         rmdata->count--;
2134
2135         /* if we failed to get the recmode, then return an error and let
2136            the main loop try again.
2137         */
2138         if (state->state != CTDB_CONTROL_DONE) {
2139                 if (rmdata->status == MONITOR_OK) {
2140                         rmdata->status = MONITOR_FAILED;
2141                 }
2142                 return;
2143         }
2144
2145         /* if we got a response, then the recmode will be stored in the
2146            status field
2147         */
2148         if (state->status != CTDB_RECOVERY_NORMAL) {
2149                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2150                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2151         }
2152
2153         return;
2154 }
2155
2156
2157 /* verify that all nodes are in normal recovery mode */
2158 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2159 {
2160         struct verify_recmode_normal_data *rmdata;
2161         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2162         struct ctdb_client_control_state *state;
2163         enum monitor_result status;
2164         int j;
2165         
2166         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2167         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2168         rmdata->count  = 0;
2169         rmdata->status = MONITOR_OK;
2170
2171         /* loop over all active nodes and send an async getrecmode call to 
2172            them*/
2173         for (j=0; j<nodemap->num; j++) {
2174                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2175                         continue;
2176                 }
2177                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2178                                         CONTROL_TIMEOUT(), 
2179                                         nodemap->nodes[j].pnn);
2180                 if (state == NULL) {
2181                         /* we failed to send the control, treat this as 
2182                            an error and try again next iteration
2183                         */                      
2184                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2185                         talloc_free(mem_ctx);
2186                         return MONITOR_FAILED;
2187                 }
2188
2189                 /* set up the callback functions */
2190                 state->async.fn = verify_recmode_normal_callback;
2191                 state->async.private_data = rmdata;
2192
2193                 /* one more control to wait for to complete */
2194                 rmdata->count++;
2195         }
2196
2197
2198         /* now wait for up to the maximum number of seconds allowed
2199            or until all nodes we expect a response from has replied
2200         */
2201         while (rmdata->count > 0) {
2202                 tevent_loop_once(ctdb->ev);
2203         }
2204
2205         status = rmdata->status;
2206         talloc_free(mem_ctx);
2207         return status;
2208 }
2209
2210
2211 struct verify_recmaster_data {
2212         struct ctdb_recoverd *rec;
2213         uint32_t count;
2214         uint32_t pnn;
2215         enum monitor_result status;
2216 };
2217
2218 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2219 {
2220         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2221
2222
2223         /* one more node has responded with recmaster data*/
2224         rmdata->count--;
2225
2226         /* if we failed to get the recmaster, then return an error and let
2227            the main loop try again.
2228         */
2229         if (state->state != CTDB_CONTROL_DONE) {
2230                 if (rmdata->status == MONITOR_OK) {
2231                         rmdata->status = MONITOR_FAILED;
2232                 }
2233                 return;
2234         }
2235
2236         /* if we got a response, then the recmaster will be stored in the
2237            status field
2238         */
2239         if (state->status != rmdata->pnn) {
2240                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2241                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2242                 rmdata->status = MONITOR_ELECTION_NEEDED;
2243         }
2244
2245         return;
2246 }
2247
2248
2249 /* verify that all nodes agree that we are the recmaster */
2250 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
2251 {
2252         struct ctdb_context *ctdb = rec->ctdb;
2253         struct verify_recmaster_data *rmdata;
2254         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2255         struct ctdb_client_control_state *state;
2256         enum monitor_result status;
2257         int j;
2258         
2259         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2260         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2261         rmdata->rec    = rec;
2262         rmdata->count  = 0;
2263         rmdata->pnn    = pnn;
2264         rmdata->status = MONITOR_OK;
2265
2266         /* loop over all active nodes and send an async getrecmaster call to
2267            them*/
2268         for (j=0; j<nodemap->num; j++) {
2269                 if (nodemap->nodes[j].pnn == rec->recmaster) {
2270                         continue;
2271                 }
2272                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2273                         continue;
2274                 }
2275                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2276                                         CONTROL_TIMEOUT(),
2277                                         nodemap->nodes[j].pnn);
2278                 if (state == NULL) {
2279                         /* we failed to send the control, treat this as 
2280                            an error and try again next iteration
2281                         */                      
2282                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2283                         talloc_free(mem_ctx);
2284                         return MONITOR_FAILED;
2285                 }
2286
2287                 /* set up the callback functions */
2288                 state->async.fn = verify_recmaster_callback;
2289                 state->async.private_data = rmdata;
2290
2291                 /* one more control to wait for to complete */
2292                 rmdata->count++;
2293         }
2294
2295
2296         /* now wait for up to the maximum number of seconds allowed
2297            or until all nodes we expect a response from has replied
2298         */
2299         while (rmdata->count > 0) {
2300                 tevent_loop_once(ctdb->ev);
2301         }
2302
2303         status = rmdata->status;
2304         talloc_free(mem_ctx);
2305         return status;
2306 }
2307
2308 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2309                                     struct ctdb_recoverd *rec)
2310 {
2311         struct ctdb_iface_list_old *ifaces = NULL;
2312         TALLOC_CTX *mem_ctx;
2313         bool ret = false;
2314
2315         mem_ctx = talloc_new(NULL);
2316
2317         /* Read the interfaces from the local node */
2318         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2319                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2320                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2321                 /* We could return an error.  However, this will be
2322                  * rare so we'll decide that the interfaces have
2323                  * actually changed, just in case.
2324                  */
2325                 talloc_free(mem_ctx);
2326                 return true;
2327         }
2328
2329         if (!rec->ifaces) {
2330                 /* We haven't been here before so things have changed */
2331                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2332                 ret = true;
2333         } else if (rec->ifaces->num != ifaces->num) {
2334                 /* Number of interfaces has changed */
2335                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2336                                      rec->ifaces->num, ifaces->num));
2337                 ret = true;
2338         } else {
2339                 /* See if interface names or link states have changed */
2340                 int i;
2341                 for (i = 0; i < rec->ifaces->num; i++) {
2342                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2343                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2344                                 DEBUG(DEBUG_NOTICE,
2345                                       ("Interface in slot %d changed: %s => %s\n",
2346                                        i, iface->name, ifaces->ifaces[i].name));
2347                                 ret = true;
2348                                 break;
2349                         }
2350                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2351                                 DEBUG(DEBUG_NOTICE,
2352                                       ("Interface %s changed state: %d => %d\n",
2353                                        iface->name, iface->link_state,
2354                                        ifaces->ifaces[i].link_state));
2355                                 ret = true;
2356                                 break;
2357                         }
2358                 }
2359         }
2360
2361         talloc_free(rec->ifaces);
2362         rec->ifaces = talloc_steal(rec, ifaces);
2363
2364         talloc_free(mem_ctx);
2365         return ret;
2366 }
2367
2368 /* Check that the local allocation of public IP addresses is correct
2369  * and do some house-keeping */
2370 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2371                                       struct ctdb_recoverd *rec,
2372                                       uint32_t pnn,
2373                                       struct ctdb_node_map_old *nodemap)
2374 {
2375         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2376         int ret, j;
2377         bool need_takeover_run = false;
2378         struct ctdb_public_ip_list_old *ips = NULL;
2379
2380         /* If we are not the recmaster then do some housekeeping */
2381         if (rec->recmaster != pnn) {
2382                 /* Ignore any IP reallocate requests - only recmaster
2383                  * processes them
2384                  */
2385                 TALLOC_FREE(rec->reallocate_requests);
2386                 /* Clear any nodes that should be force rebalanced in
2387                  * the next takeover run.  If the recovery master role
2388                  * has moved then we don't want to process these some
2389                  * time in the future.
2390                  */
2391                 TALLOC_FREE(rec->force_rebalance_nodes);
2392         }
2393
2394         /* Return early if disabled... */
2395         if (ctdb_config.failover_disabled ||
2396             ctdb_op_is_disabled(rec->takeover_run)) {
2397                 return  0;
2398         }
2399
2400         if (interfaces_have_changed(ctdb, rec)) {
2401                 need_takeover_run = true;
2402         }
2403
2404         /* If there are unhosted IPs but this node can host them then
2405          * trigger an IP reallocation */
2406
2407         /* Read *available* IPs from local node */
2408         ret = ctdb_ctrl_get_public_ips_flags(
2409                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2410                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2411         if (ret != 0) {
2412                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2413                 talloc_free(mem_ctx);
2414                 return -1;
2415         }
2416
2417         for (j=0; j<ips->num; j++) {
2418                 if (ips->ips[j].pnn == -1 &&
2419                     nodemap->nodes[pnn].flags == 0) {
2420                         DEBUG(DEBUG_WARNING,
2421                               ("Unassigned IP %s can be served by this node\n",
2422                                ctdb_addr_to_str(&ips->ips[j].addr)));
2423                         need_takeover_run = true;
2424                 }
2425         }
2426
2427         talloc_free(ips);
2428
2429         if (!ctdb->do_checkpublicip) {
2430                 goto done;
2431         }
2432
2433         /* Validate the IP addresses that this node has on network
2434          * interfaces.  If there is an inconsistency between reality
2435          * and the state expected by CTDB then try to fix it by
2436          * triggering an IP reallocation or releasing extraneous IP
2437          * addresses. */
2438
2439         /* Read *known* IPs from local node */
2440         ret = ctdb_ctrl_get_public_ips_flags(
2441                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2442         if (ret != 0) {
2443                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2444                 talloc_free(mem_ctx);
2445                 return -1;
2446         }
2447
2448         for (j=0; j<ips->num; j++) {
2449                 if (ips->ips[j].pnn == pnn) {
2450                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2451                                 DEBUG(DEBUG_ERR,
2452                                       ("Assigned IP %s not on an interface\n",
2453                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2454                                 need_takeover_run = true;
2455                         }
2456                 } else {
2457                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2458                                 DEBUG(DEBUG_ERR,
2459                                       ("IP %s incorrectly on an interface\n",
2460                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2461                                 need_takeover_run = true;
2462                         }
2463                 }
2464         }
2465
2466 done:
2467         if (need_takeover_run) {
2468                 struct ctdb_srvid_message rd;
2469                 TDB_DATA data;
2470
2471                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2472
2473                 ZERO_STRUCT(rd);
2474                 rd.pnn = ctdb->pnn;
2475                 rd.srvid = 0;
2476                 data.dptr = (uint8_t *)&rd;
2477                 data.dsize = sizeof(rd);
2478
2479                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2480                 if (ret != 0) {
2481                         DEBUG(DEBUG_ERR,
2482                               ("Failed to send takeover run request\n"));
2483                 }
2484         }
2485         talloc_free(mem_ctx);
2486         return 0;
2487 }
2488
2489
2490 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2491 {
2492         struct ctdb_node_map_old **remote_nodemaps = callback_data;
2493
2494         if (node_pnn >= ctdb->num_nodes) {
2495                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2496                 return;
2497         }
2498
2499         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
2500
2501 }
2502
2503 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2504         struct ctdb_node_map_old *nodemap,
2505         struct ctdb_node_map_old **remote_nodemaps)
2506 {
2507         uint32_t *nodes;
2508
2509         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2510         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2511                                         nodes, 0,
2512                                         CONTROL_TIMEOUT(), false, tdb_null,
2513                                         async_getnodemap_callback,
2514                                         NULL,
2515                                         remote_nodemaps) != 0) {
2516                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2517
2518                 return -1;
2519         }
2520
2521         return 0;
2522 }
2523
2524 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2525                                      TALLOC_CTX *mem_ctx)
2526 {
2527         struct ctdb_context *ctdb = rec->ctdb;
2528         uint32_t pnn = ctdb_get_pnn(ctdb);
2529         struct ctdb_node_map_old *nodemap = rec->nodemap;
2530         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2531         int ret;
2532
2533         /* When recovery daemon is started, recmaster is set to
2534          * "unknown" so it knows to start an election.
2535          */
2536         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2537                 DEBUG(DEBUG_NOTICE,
2538                       ("Initial recovery master set - forcing election\n"));
2539                 force_election(rec, pnn, nodemap);
2540                 return false;
2541         }
2542
2543         /*
2544          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2545          * but we have, then force an election and try to become the new
2546          * recmaster.
2547          */
2548         if (!ctdb_node_has_capabilities(rec->caps,
2549                                         rec->recmaster,
2550                                         CTDB_CAP_RECMASTER) &&
2551             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2552             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2553                 DEBUG(DEBUG_ERR,
2554                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2555                        " but we (node %u) have - force an election\n",
2556                        rec->recmaster, pnn));
2557                 force_election(rec, pnn, nodemap);
2558                 return false;
2559         }
2560
2561         /* Verify that the master node has not been deleted.  This
2562          * should not happen because a node should always be shutdown
2563          * before being deleted, causing a new master to be elected
2564          * before now.  However, if something strange has happened
2565          * then checking here will ensure we don't index beyond the
2566          * end of the nodemap array. */
2567         if (rec->recmaster >= nodemap->num) {
2568                 DEBUG(DEBUG_ERR,
2569                       ("Recmaster node %u has been deleted. Force election\n",
2570                        rec->recmaster));
2571                 force_election(rec, pnn, nodemap);
2572                 return false;
2573         }
2574
2575         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2576         if (nodemap->nodes[rec->recmaster].flags &
2577             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2578                 DEBUG(DEBUG_NOTICE,
2579                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2580                        rec->recmaster));
2581                 force_election(rec, pnn, nodemap);
2582                 return false;
2583         }
2584
2585         /* get nodemap from the recovery master to check if it is inactive */
2586         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2587                                    mem_ctx, &recmaster_nodemap);
2588         if (ret != 0) {
2589                 DEBUG(DEBUG_ERR,
2590                       (__location__
2591                        " Unable to get nodemap from recovery master %u\n",
2592                           rec->recmaster));
2593                 /* No election, just error */
2594                 return false;
2595         }
2596
2597
2598         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2599             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2600                 DEBUG(DEBUG_NOTICE,
2601                       ("Recmaster node %u is inactive. Force election\n",
2602                        rec->recmaster));
2603                 /*
2604                  * update our nodemap to carry the recmaster's notion of
2605                  * its own flags, so that we don't keep freezing the
2606                  * inactive recmaster node...
2607                  */
2608                 nodemap->nodes[rec->recmaster].flags =
2609                         recmaster_nodemap->nodes[rec->recmaster].flags;
2610                 force_election(rec, pnn, nodemap);
2611                 return false;
2612         }
2613
2614         return true;
2615 }
2616
2617 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2618                       TALLOC_CTX *mem_ctx)
2619 {
2620         uint32_t pnn;
2621         struct ctdb_node_map_old *nodemap=NULL;
2622         struct ctdb_node_map_old **remote_nodemaps=NULL;
2623         struct ctdb_vnn_map *vnnmap=NULL;
2624         struct ctdb_vnn_map *remote_vnnmap=NULL;
2625         uint32_t num_lmasters;
2626         int32_t debug_level;
2627         int i, j, ret;
2628         bool self_ban;
2629
2630
2631         /* verify that the main daemon is still running */
2632         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2633                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2634                 exit(-1);
2635         }
2636
2637         /* ping the local daemon to tell it we are alive */
2638         ctdb_ctrl_recd_ping(ctdb);
2639
2640         if (rec->election_timeout) {
2641                 /* an election is in progress */
2642                 return;
2643         }
2644
2645         /* read the debug level from the parent and update locally */
2646         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2647         if (ret !=0) {
2648                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2649                 return;
2650         }
2651         debuglevel_set(debug_level);
2652
2653         /* get relevant tunables */
2654         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2655         if (ret != 0) {
2656                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2657                 return;
2658         }
2659
2660         /* get runstate */
2661         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2662                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2663         if (ret != 0) {
2664                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2665                 return;
2666         }
2667
2668         pnn = ctdb_get_pnn(ctdb);
2669
2670         /* get nodemap */
2671         TALLOC_FREE(rec->nodemap);
2672         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2673         if (ret != 0) {
2674                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2675                 return;
2676         }
2677         nodemap = rec->nodemap;
2678
2679         /* remember our own node flags */
2680         rec->node_flags = nodemap->nodes[pnn].flags;
2681
2682         ban_misbehaving_nodes(rec, &self_ban);
2683         if (self_ban) {
2684                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2685                 return;
2686         }
2687
2688         ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2689                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2690         if (ret != 0) {
2691                 D_ERR("Failed to read recmode from local node\n");
2692                 return;
2693         }
2694
2695         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2696            also frozen and that the recmode is set to active.
2697         */
2698         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2699                 /* If this node has become inactive then we want to
2700                  * reduce the chances of it taking over the recovery
2701                  * master role when it becomes active again.  This
2702                  * helps to stabilise the recovery master role so that
2703                  * it stays on the most stable node.
2704                  */
2705                 rec->priority_time = timeval_current();
2706
2707                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2708                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2709
2710                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2711                         if (ret != 0) {
2712                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2713
2714                                 return;
2715                         }
2716                 }
2717                 if (! rec->frozen_on_inactive) {
2718                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2719                                                CTDB_CURRENT_NODE);
2720                         if (ret != 0) {
2721                                 DEBUG(DEBUG_ERR,
2722                                       (__location__ " Failed to freeze node "
2723                                        "in STOPPED or BANNED state\n"));
2724                                 return;
2725                         }
2726
2727                         rec->frozen_on_inactive = true;
2728                 }
2729
2730                 /* If this node is stopped or banned then it is not the recovery
2731                  * master, so don't do anything. This prevents stopped or banned
2732                  * node from starting election and sending unnecessary controls.
2733                  */
2734                 return;
2735         }
2736
2737         rec->frozen_on_inactive = false;
2738
2739         /* Retrieve capabilities from all connected nodes */
2740         ret = update_capabilities(rec, nodemap);
2741         if (ret != 0) {
2742                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2743                 return;
2744         }
2745
2746         if (! validate_recovery_master(rec, mem_ctx)) {
2747                 return;
2748         }
2749
2750         if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2751                 /* Check if an IP takeover run is needed and trigger one if
2752                  * necessary */
2753                 verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2754         }
2755
2756         /* if we are not the recmaster then we do not need to check
2757            if recovery is needed
2758          */
2759         if (pnn != rec->recmaster) {
2760                 return;
2761         }
2762
2763
2764         /* ensure our local copies of flags are right */
2765         ret = update_local_flags(rec, nodemap);
2766         if (ret != 0) {
2767                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2768                 return;
2769         }
2770
2771         if (ctdb->num_nodes != nodemap->num) {
2772                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2773                 ctdb_load_nodes_file(ctdb);
2774                 return;
2775         }
2776
2777         /* verify that all active nodes agree that we are the recmaster */
2778         switch (verify_recmaster(rec, nodemap, pnn)) {
2779         case MONITOR_RECOVERY_NEEDED:
2780                 /* can not happen */
2781                 return;
2782         case MONITOR_ELECTION_NEEDED:
2783                 force_election(rec, pnn, nodemap);
2784                 return;
2785         case MONITOR_OK:
2786                 break;
2787         case MONITOR_FAILED:
2788                 return;
2789         }
2790
2791
2792         /* get the vnnmap */
2793         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2794         if (ret != 0) {
2795                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2796                 return;
2797         }
2798
2799         if (rec->need_recovery) {
2800                 /* a previous recovery didn't finish */
2801                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2802                 return;
2803         }
2804
2805         /* verify that all active nodes are in normal mode 
2806            and not in recovery mode 
2807         */
2808         switch (verify_recmode(ctdb, nodemap)) {
2809         case MONITOR_RECOVERY_NEEDED:
2810                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2811                 return;
2812         case MONITOR_FAILED:
2813                 return;
2814         case MONITOR_ELECTION_NEEDED:
2815                 /* can not happen */
2816         case MONITOR_OK:
2817                 break;
2818         }
2819
2820
2821         if (ctdb->recovery_lock != NULL) {
2822                 /* We must already hold the recovery lock */
2823                 if (!ctdb_recovery_have_lock(rec)) {
2824                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2825                         ctdb_set_culprit(rec, ctdb->pnn);
2826                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2827                         return;
2828                 }
2829         }
2830
2831
2832         /* If recoveries are disabled then there is no use doing any
2833          * nodemap or flags checks.  Recoveries might be disabled due
2834          * to "reloadnodes", so doing these checks might cause an
2835          * unnecessary recovery.  */
2836         if (ctdb_op_is_disabled(rec->recovery)) {
2837                 goto takeover_run_checks;
2838         }
2839
2840         /* get the nodemap for all active remote nodes
2841          */
2842         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
2843         if (remote_nodemaps == NULL) {
2844                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2845                 return;
2846         }
2847         for(i=0; i<nodemap->num; i++) {
2848                 remote_nodemaps[i] = NULL;
2849         }
2850         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2851                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2852                 return;
2853         } 
2854
2855         /* verify that all other nodes have the same nodemap as we have
2856         */
2857         for (j=0; j<nodemap->num; j++) {
2858                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2859                         continue;
2860                 }
2861
2862                 if (remote_nodemaps[j] == NULL) {
2863                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2864                         ctdb_set_culprit(rec, j);
2865
2866                         return;
2867                 }
2868
2869                 /* if the nodes disagree on how many nodes there are
2870                    then this is a good reason to try recovery
2871                  */
2872                 if (remote_nodemaps[j]->num != nodemap->num) {
2873                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2874                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2875                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2876                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2877                         return;
2878                 }
2879
2880                 /* if the nodes disagree on which nodes exist and are
2881                    active, then that is also a good reason to do recovery
2882                  */
2883                 for (i=0;i<nodemap->num;i++) {
2884                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2885                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2886                                           nodemap->nodes[j].pnn, i, 
2887                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2888                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2889                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2890                                             vnnmap);
2891                                 return;
2892                         }
2893                 }
2894         }
2895
2896         /*
2897          * Update node flags obtained from each active node. This ensure we have
2898          * up-to-date information for all the nodes.
2899          */
2900         for (j=0; j<nodemap->num; j++) {
2901                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2902                         continue;
2903                 }
2904                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2905         }
2906
2907         for (j=0; j<nodemap->num; j++) {
2908                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2909                         continue;
2910                 }
2911
2912                 /* verify the flags are consistent
2913                 */
2914                 for (i=0; i<nodemap->num; i++) {
2915                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2916                                 continue;
2917                         }
2918                         
2919                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2920                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2921                                   nodemap->nodes[j].pnn, 
2922                                   nodemap->nodes[i].pnn, 
2923                                   remote_nodemaps[j]->nodes[i].flags,
2924                                   nodemap->nodes[i].flags));
2925                                 if (i == j) {
2926                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2927                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2928                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2929                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2930                                                     vnnmap);
2931                                         return;
2932                                 } else {
2933                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2934                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2935                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2936                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2937                                                     vnnmap);
2938                                         return;
2939                                 }
2940                         }
2941                 }
2942         }
2943
2944
2945         /* count how many active nodes there are */
2946         num_lmasters  = 0;
2947         for (i=0; i<nodemap->num; i++) {
2948                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2949                         if (ctdb_node_has_capabilities(rec->caps,
2950                                                        ctdb->nodes[i]->pnn,
2951                                                        CTDB_CAP_LMASTER)) {
2952                                 num_lmasters++;
2953                         }
2954                 }
2955         }
2956
2957
2958         /* There must be the same number of lmasters in the vnn map as
2959          * there are active nodes with the lmaster capability...  or
2960          * do a recovery.
2961          */
2962         if (vnnmap->size != num_lmasters) {
2963                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2964                           vnnmap->size, num_lmasters));
2965                 ctdb_set_culprit(rec, ctdb->pnn);
2966                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2967                 return;
2968         }
2969
2970         /* verify that all active nodes in the nodemap also exist in 
2971            the vnnmap.
2972          */
2973         for (j=0; j<nodemap->num; j++) {
2974                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2975                         continue;
2976                 }
2977                 if (nodemap->nodes[j].pnn == pnn) {
2978                         continue;
2979                 }
2980
2981                 for (i=0; i<vnnmap->size; i++) {
2982                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2983                                 break;
2984                         }
2985                 }
2986                 if (i == vnnmap->size) {
2987                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2988                                   nodemap->nodes[j].pnn));
2989                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2990                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2991                         return;
2992                 }
2993         }
2994
2995         
2996         /* verify that all other nodes have the same vnnmap
2997            and are from the same generation
2998          */
2999         for (j=0; j<nodemap->num; j++) {
3000                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3001                         continue;
3002                 }
3003                 if (nodemap->nodes[j].pnn == pnn) {
3004                         continue;
3005                 }
3006
3007                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3008                                           mem_ctx, &remote_vnnmap);
3009                 if (ret != 0) {
3010                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3011                                   nodemap->nodes[j].pnn));
3012                         return;
3013                 }
3014
3015                 /* verify the vnnmap generation is the same */
3016                 if (vnnmap->generation != remote_vnnmap->generation) {
3017                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3018                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3019                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3020                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3021                         return;
3022                 }
3023
3024                 /* verify the vnnmap size is the same */
3025                 if (vnnmap->size != remote_vnnmap->size) {
3026                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3027                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3028                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3029                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3030                         return;
3031                 }
3032
3033                 /* verify the vnnmap is the same */
3034                 for (i=0;i<vnnmap->size;i++) {
3035                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3036                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3037                                           nodemap->nodes[j].pnn));
3038                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3039                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3040                                             vnnmap);
3041                                 return;
3042                         }
3043                 }
3044         }
3045
3046         /* FIXME: Add remote public IP checking to ensure that nodes
3047          * have the IP addresses that are allocated to them. */
3048
3049 takeover_run_checks:
3050
3051         /* If there are IP takeover runs requested or the previous one
3052          * failed then perform one and notify the waiters */
3053         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3054             (rec->reallocate_requests || rec->need_takeover_run)) {
3055                 process_ipreallocate_requests(ctdb, rec);
3056         }
3057 }
3058
3059 static void recd_sig_term_handler(struct tevent_context *ev,
3060                                   struct tevent_signal *se, int signum,
3061                                   int count, void *dont_care,
3062                                   void *private_data)
3063 {
3064         struct ctdb_recoverd *rec = talloc_get_type_abort(
3065                 private_data, struct ctdb_recoverd);
3066
3067         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
3068         ctdb_recovery_unlock(rec);
3069         exit(0);
3070 }
3071
3072
3073 /*
3074   the main monitoring loop
3075  */
3076 static void monitor_cluster(struct ctdb_context *ctdb)
3077 {
3078         struct tevent_signal *se;
3079         struct ctdb_recoverd *rec;
3080
3081         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3082
3083         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3084         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3085
3086         rec->ctdb = ctdb;
3087         rec->recmaster = CTDB_UNKNOWN_PNN;
3088         rec->recovery_lock_handle = NULL;
3089
3090         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3091         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3092
3093         rec->recovery = ctdb_op_init(rec, "recoveries");
3094         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3095
3096         rec->priority_time = timeval_current();
3097         rec->frozen_on_inactive = false;
3098
3099         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
3100                                recd_sig_term_handler, rec);
3101         if (se == NULL) {
3102                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
3103                 exit(1);
3104         }
3105
3106         /* register a message port for sending memory dumps */
3107         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3108
3109         /* when a node is assigned banning credits */
3110         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3111                                         banning_handler, rec);
3112
3113         /* register a message port for recovery elections */
3114         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3115
3116         /* when nodes are disabled/enabled */
3117         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3118
3119         /* when we are asked to puch out a flag change */
3120         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3121
3122         /* register a message port for vacuum fetch */
3123         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3124
3125         /* register a message port for reloadnodes  */
3126         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3127
3128         /* register a message port for performing a takeover run */
3129         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3130
3131         /* register a message port for disabling the ip check for a short while */
3132         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3133
3134         /* register a message port for forcing a rebalance of a node next
3135            reallocation */
3136         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3137
3138         /* Register a message port for disabling takeover runs */
3139         ctdb_client_set_message_handler(ctdb,
3140                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3141                                         disable_takeover_runs_handler, rec);
3142
3143         /* Register a message port for disabling recoveries */
3144         ctdb_client_set_message_handler(ctdb,
3145                                         CTDB_SRVID_DISABLE_RECOVERIES,
3146                                         disable_recoveries_handler, rec);
3147
3148         /* register a message port for detaching database */
3149         ctdb_client_set_message_handler(ctdb,
3150                                         CTDB_SRVID_DETACH_DATABASE,
3151                                         detach_database_handler, rec);
3152
3153         for (;;) {
3154                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3155                 struct timeval start;
3156                 double elapsed;
3157
3158                 if (!mem_ctx) {
3159                         DEBUG(DEBUG_CRIT,(__location__
3160                                           " Failed to create temp context\n"));
3161                         exit(-1);
3162                 }
3163
3164                 start = timeval_current();
3165                 main_loop(ctdb, rec, mem_ctx);
3166                 talloc_free(mem_ctx);
3167
3168                 /* we only check for recovery once every second */
3169                 elapsed = timeval_elapsed(&start);
3170                 if (elapsed < ctdb->tunable.recover_interval) {
3171                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3172                                           - elapsed);
3173                 }
3174         }
3175 }
3176
3177 /*
3178   event handler for when the main ctdbd dies
3179  */
3180 static void ctdb_recoverd_parent(struct tevent_context *ev,
3181                                  struct tevent_fd *fde,
3182                                  uint16_t flags, void *private_data)
3183 {
3184         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3185         _exit(1);
3186 }
3187
3188 /*
3189   called regularly to verify that the recovery daemon is still running
3190  */
3191 static void ctdb_check_recd(struct tevent_context *ev,
3192                             struct tevent_timer *te,
3193                             struct timeval yt, void *p)
3194 {
3195         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3196
3197         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3198                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3199
3200                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3201                                  ctdb_restart_recd, ctdb);
3202
3203                 return;
3204         }
3205
3206         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3207                          timeval_current_ofs(30, 0),
3208                          ctdb_check_recd, ctdb);
3209 }
3210
3211 static void recd_sig_child_handler(struct tevent_context *ev,
3212                                    struct tevent_signal *se, int signum,
3213                                    int count, void *dont_care,
3214                                    void *private_data)
3215 {
3216 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3217         int status;
3218         pid_t pid = -1;
3219
3220         while (pid != 0) {
3221                 pid = waitpid(-1, &status, WNOHANG);
3222                 if (pid == -1) {
3223                         if (errno != ECHILD) {
3224                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3225                         }
3226                         return;
3227                 }
3228                 if (pid > 0) {
3229                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3230                 }
3231         }
3232 }
3233
3234 /*
3235   startup the recovery daemon as a child of the main ctdb daemon
3236  */
3237 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3238 {
3239         int fd[2];
3240         struct tevent_signal *se;
3241         struct tevent_fd *fde;
3242         int ret;
3243
3244         if (pipe(fd) != 0) {
3245                 return -1;
3246         }
3247
3248         ctdb->recoverd_pid = ctdb_fork(ctdb);
3249         if (ctdb->recoverd_pid == -1) {
3250                 return -1;
3251         }
3252
3253         if (ctdb->recoverd_pid != 0) {
3254                 talloc_free(ctdb->recd_ctx);
3255                 ctdb->recd_ctx = talloc_new(ctdb);
3256                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3257
3258                 close(fd[0]);
3259                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3260                                  timeval_current_ofs(30, 0),
3261                                  ctdb_check_recd, ctdb);
3262                 return 0;
3263         }
3264
3265         close(fd[1]);
3266
3267         srandom(getpid() ^ time(NULL));
3268
3269         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3270         if (ret != 0) {
3271                 return -1;
3272         }
3273
3274         prctl_set_comment("ctdb_recoverd");
3275         if (switch_from_server_to_client(ctdb) != 0) {
3276                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3277                 exit(1);
3278         }
3279
3280         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3281
3282         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3283                             ctdb_recoverd_parent, &fd[0]);
3284         tevent_fd_set_auto_close(fde);
3285
3286         /* set up a handler to pick up sigchld */
3287         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3288                                recd_sig_child_handler, ctdb);
3289         if (se == NULL) {
3290                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3291                 exit(1);
3292         }
3293
3294         monitor_cluster(ctdb);
3295
3296         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3297         return -1;
3298 }
3299
3300 /*
3301   shutdown the recovery daemon
3302  */
3303 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3304 {
3305         if (ctdb->recoverd_pid == 0) {
3306                 return;
3307         }
3308
3309         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3310         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3311
3312         TALLOC_FREE(ctdb->recd_ctx);
3313         TALLOC_FREE(ctdb->recd_ping_count);
3314 }
3315
3316 static void ctdb_restart_recd(struct tevent_context *ev,
3317                               struct tevent_timer *te,
3318                               struct timeval t, void *private_data)
3319 {
3320         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3321
3322         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3323         ctdb_stop_recoverd(ctdb);
3324         ctdb_start_recoverd(ctdb);
3325 }