ctdb/wscript: make use of MODE_{644,744,755,777}
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system_socket.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "server/ctdb_config.h"
46
47 #include "ctdb_cluster_mutex.h"
48
49 /* List of SRVID requests that need to be processed */
50 struct srvid_list {
51         struct srvid_list *next, *prev;
52         struct ctdb_srvid_message *request;
53 };
54
55 struct srvid_requests {
56         struct srvid_list *requests;
57 };
58
59 static void srvid_request_reply(struct ctdb_context *ctdb,
60                                 struct ctdb_srvid_message *request,
61                                 TDB_DATA result)
62 {
63         /* Someone that sent srvid==0 does not want a reply */
64         if (request->srvid == 0) {
65                 talloc_free(request);
66                 return;
67         }
68
69         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
70                                      result) == 0) {
71                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
72                                   (unsigned)request->pnn,
73                                   (unsigned long long)request->srvid));
74         } else {
75                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
76                                  (unsigned)request->pnn,
77                                  (unsigned long long)request->srvid));
78         }
79
80         talloc_free(request);
81 }
82
83 static void srvid_requests_reply(struct ctdb_context *ctdb,
84                                  struct srvid_requests **requests,
85                                  TDB_DATA result)
86 {
87         struct srvid_list *r;
88
89         if (*requests == NULL) {
90                 return;
91         }
92
93         for (r = (*requests)->requests; r != NULL; r = r->next) {
94                 srvid_request_reply(ctdb, r->request, result);
95         }
96
97         /* Free the list structure... */
98         TALLOC_FREE(*requests);
99 }
100
101 static void srvid_request_add(struct ctdb_context *ctdb,
102                               struct srvid_requests **requests,
103                               struct ctdb_srvid_message *request)
104 {
105         struct srvid_list *t;
106         int32_t ret;
107         TDB_DATA result;
108
109         if (*requests == NULL) {
110                 *requests = talloc_zero(ctdb, struct srvid_requests);
111                 if (*requests == NULL) {
112                         goto nomem;
113                 }
114         }
115
116         t = talloc_zero(*requests, struct srvid_list);
117         if (t == NULL) {
118                 /* If *requests was just allocated above then free it */
119                 if ((*requests)->requests == NULL) {
120                         TALLOC_FREE(*requests);
121                 }
122                 goto nomem;
123         }
124
125         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
126         DLIST_ADD((*requests)->requests, t);
127
128         return;
129
130 nomem:
131         /* Failed to add the request to the list.  Send a fail. */
132         DEBUG(DEBUG_ERR, (__location__
133                           " Out of memory, failed to queue SRVID request\n"));
134         ret = -ENOMEM;
135         result.dsize = sizeof(ret);
136         result.dptr = (uint8_t *)&ret;
137         srvid_request_reply(ctdb, request, result);
138 }
139
140 /* An abstraction to allow an operation (takeover runs, recoveries,
141  * ...) to be disabled for a given timeout */
142 struct ctdb_op_state {
143         struct tevent_timer *timer;
144         bool in_progress;
145         const char *name;
146 };
147
148 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
149 {
150         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
151
152         if (state != NULL) {
153                 state->in_progress = false;
154                 state->name = name;
155         }
156
157         return state;
158 }
159
160 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
161 {
162         return state->timer != NULL;
163 }
164
165 static bool ctdb_op_begin(struct ctdb_op_state *state)
166 {
167         if (ctdb_op_is_disabled(state)) {
168                 DEBUG(DEBUG_NOTICE,
169                       ("Unable to begin - %s are disabled\n", state->name));
170                 return false;
171         }
172
173         state->in_progress = true;
174         return true;
175 }
176
177 static bool ctdb_op_end(struct ctdb_op_state *state)
178 {
179         return state->in_progress = false;
180 }
181
182 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
183 {
184         return state->in_progress;
185 }
186
187 static void ctdb_op_enable(struct ctdb_op_state *state)
188 {
189         TALLOC_FREE(state->timer);
190 }
191
192 static void ctdb_op_timeout_handler(struct tevent_context *ev,
193                                     struct tevent_timer *te,
194                                     struct timeval yt, void *p)
195 {
196         struct ctdb_op_state *state =
197                 talloc_get_type(p, struct ctdb_op_state);
198
199         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
200         ctdb_op_enable(state);
201 }
202
203 static int ctdb_op_disable(struct ctdb_op_state *state,
204                            struct tevent_context *ev,
205                            uint32_t timeout)
206 {
207         if (timeout == 0) {
208                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
209                 ctdb_op_enable(state);
210                 return 0;
211         }
212
213         if (state->in_progress) {
214                 DEBUG(DEBUG_ERR,
215                       ("Unable to disable %s - in progress\n", state->name));
216                 return -EAGAIN;
217         }
218
219         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
220                             state->name, timeout));
221
222         /* Clear any old timers */
223         talloc_free(state->timer);
224
225         /* Arrange for the timeout to occur */
226         state->timer = tevent_add_timer(ev, state,
227                                         timeval_current_ofs(timeout, 0),
228                                         ctdb_op_timeout_handler, state);
229         if (state->timer == NULL) {
230                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
231                 return -ENOMEM;
232         }
233
234         return 0;
235 }
236
237 struct ctdb_banning_state {
238         uint32_t count;
239         struct timeval last_reported_time;
240 };
241
242 struct ctdb_recovery_lock_handle;
243
244 /*
245   private state of recovery daemon
246  */
247 struct ctdb_recoverd {
248         struct ctdb_context *ctdb;
249         uint32_t recmaster;
250         uint32_t last_culprit_node;
251         struct ctdb_node_map_old *nodemap;
252         struct timeval priority_time;
253         bool need_takeover_run;
254         bool need_recovery;
255         uint32_t node_flags;
256         struct tevent_timer *send_election_te;
257         struct tevent_timer *election_timeout;
258         struct srvid_requests *reallocate_requests;
259         struct ctdb_op_state *takeover_run;
260         struct ctdb_op_state *recovery;
261         struct ctdb_iface_list_old *ifaces;
262         uint32_t *force_rebalance_nodes;
263         struct ctdb_node_capabilities *caps;
264         bool frozen_on_inactive;
265         struct ctdb_recovery_lock_handle *recovery_lock_handle;
266 };
267
268 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
269 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
270
271 static void ctdb_restart_recd(struct tevent_context *ev,
272                               struct tevent_timer *te, struct timeval t,
273                               void *private_data);
274
275 /*
276   ban a node for a period of time
277  */
278 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
279 {
280         int ret;
281         struct ctdb_context *ctdb = rec->ctdb;
282         struct ctdb_ban_state bantime;
283
284         if (!ctdb_validate_pnn(ctdb, pnn)) {
285                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
286                 return;
287         }
288
289         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
290
291         bantime.pnn  = pnn;
292         bantime.time = ban_time;
293
294         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
295         if (ret != 0) {
296                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
297                 return;
298         }
299
300 }
301
302 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
303
304
305 /*
306   remember the trouble maker
307  */
308 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
309 {
310         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
311         struct ctdb_banning_state *ban_state;
312
313         if (culprit > ctdb->num_nodes) {
314                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
315                 return;
316         }
317
318         /* If we are banned or stopped, do not set other nodes as culprits */
319         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
320                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
321                 return;
322         }
323
324         if (ctdb->nodes[culprit]->ban_state == NULL) {
325                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
326                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
327
328                 
329         }
330         ban_state = ctdb->nodes[culprit]->ban_state;
331         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
332                 /* this was the first time in a long while this node
333                    misbehaved so we will forgive any old transgressions.
334                 */
335                 ban_state->count = 0;
336         }
337
338         ban_state->count += count;
339         ban_state->last_reported_time = timeval_current();
340         rec->last_culprit_node = culprit;
341 }
342
343 /*
344   remember the trouble maker
345  */
346 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
347 {
348         ctdb_set_culprit_count(rec, culprit, 1);
349 }
350
351 /*
352   Retrieve capabilities from all connected nodes
353  */
354 static int update_capabilities(struct ctdb_recoverd *rec,
355                                struct ctdb_node_map_old *nodemap)
356 {
357         uint32_t *capp;
358         TALLOC_CTX *tmp_ctx;
359         struct ctdb_node_capabilities *caps;
360         struct ctdb_context *ctdb = rec->ctdb;
361
362         tmp_ctx = talloc_new(rec);
363         CTDB_NO_MEMORY(ctdb, tmp_ctx);
364
365         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
366                                      CONTROL_TIMEOUT(), nodemap);
367
368         if (caps == NULL) {
369                 DEBUG(DEBUG_ERR,
370                       (__location__ " Failed to get node capabilities\n"));
371                 talloc_free(tmp_ctx);
372                 return -1;
373         }
374
375         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
376         if (capp == NULL) {
377                 DEBUG(DEBUG_ERR,
378                       (__location__
379                        " Capabilities don't include current node.\n"));
380                 talloc_free(tmp_ctx);
381                 return -1;
382         }
383         ctdb->capabilities = *capp;
384
385         TALLOC_FREE(rec->caps);
386         rec->caps = talloc_steal(rec, caps);
387
388         talloc_free(tmp_ctx);
389         return 0;
390 }
391
392 /*
393   change recovery mode on all nodes
394  */
395 static int set_recovery_mode(struct ctdb_context *ctdb,
396                              struct ctdb_recoverd *rec,
397                              struct ctdb_node_map_old *nodemap,
398                              uint32_t rec_mode)
399 {
400         TDB_DATA data;
401         uint32_t *nodes;
402         TALLOC_CTX *tmp_ctx;
403
404         tmp_ctx = talloc_new(ctdb);
405         CTDB_NO_MEMORY(ctdb, tmp_ctx);
406
407         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
408
409         data.dsize = sizeof(uint32_t);
410         data.dptr = (unsigned char *)&rec_mode;
411
412         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
413                                         nodes, 0,
414                                         CONTROL_TIMEOUT(),
415                                         false, data,
416                                         NULL, NULL,
417                                         NULL) != 0) {
418                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
419                 talloc_free(tmp_ctx);
420                 return -1;
421         }
422
423         talloc_free(tmp_ctx);
424         return 0;
425 }
426
427 /*
428   ensure all other nodes have attached to any databases that we have
429  */
430 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
431                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
432 {
433         int i, j, db, ret;
434         struct ctdb_dbid_map_old *remote_dbmap;
435
436         /* verify that all other nodes have all our databases */
437         for (j=0; j<nodemap->num; j++) {
438                 /* we don't need to ourself ourselves */
439                 if (nodemap->nodes[j].pnn == pnn) {
440                         continue;
441                 }
442                 /* don't check nodes that are unavailable */
443                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
444                         continue;
445                 }
446
447                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
448                                          mem_ctx, &remote_dbmap);
449                 if (ret != 0) {
450                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
451                         return -1;
452                 }
453
454                 /* step through all local databases */
455                 for (db=0; db<dbmap->num;db++) {
456                         const char *name;
457
458
459                         for (i=0;i<remote_dbmap->num;i++) {
460                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
461                                         break;
462                                 }
463                         }
464                         /* the remote node already have this database */
465                         if (i!=remote_dbmap->num) {
466                                 continue;
467                         }
468                         /* ok so we need to create this database */
469                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
470                                                   dbmap->dbs[db].db_id, mem_ctx,
471                                                   &name);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
474                                 return -1;
475                         }
476                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
477                                                  nodemap->nodes[j].pnn,
478                                                  mem_ctx, name,
479                                                  dbmap->dbs[db].flags, NULL);
480                         if (ret != 0) {
481                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
482                                 return -1;
483                         }
484                 }
485         }
486
487         return 0;
488 }
489
490
491 /*
492   ensure we are attached to any databases that anyone else is attached to
493  */
494 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
495                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
496 {
497         int i, j, db, ret;
498         struct ctdb_dbid_map_old *remote_dbmap;
499
500         /* verify that we have all database any other node has */
501         for (j=0; j<nodemap->num; j++) {
502                 /* we don't need to ourself ourselves */
503                 if (nodemap->nodes[j].pnn == pnn) {
504                         continue;
505                 }
506                 /* don't check nodes that are unavailable */
507                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
508                         continue;
509                 }
510
511                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
512                                          mem_ctx, &remote_dbmap);
513                 if (ret != 0) {
514                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
515                         return -1;
516                 }
517
518                 /* step through all databases on the remote node */
519                 for (db=0; db<remote_dbmap->num;db++) {
520                         const char *name;
521
522                         for (i=0;i<(*dbmap)->num;i++) {
523                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
524                                         break;
525                                 }
526                         }
527                         /* we already have this db locally */
528                         if (i!=(*dbmap)->num) {
529                                 continue;
530                         }
531                         /* ok so we need to create this database and
532                            rebuild dbmap
533                          */
534                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
535                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
536                         if (ret != 0) {
537                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
538                                           nodemap->nodes[j].pnn));
539                                 return -1;
540                         }
541                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn,
542                                            mem_ctx, name,
543                                            remote_dbmap->dbs[db].flags, NULL);
544                         if (ret != 0) {
545                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
546                                 return -1;
547                         }
548                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
549                         if (ret != 0) {
550                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
551                                 return -1;
552                         }
553                 }
554         }
555
556         return 0;
557 }
558
559 /*
560   update flags on all active nodes
561  */
562 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
563 {
564         int ret;
565
566         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
567                 if (ret != 0) {
568                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
569                 return -1;
570         }
571
572         return 0;
573 }
574
575 /*
576   called when a vacuum fetch has completed - just free it and do the next one
577  */
578 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
579 {
580         talloc_free(state);
581 }
582
583
584 /**
585  * Process one elements of the vacuum fetch list:
586  * Migrate it over to us with the special flag
587  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
588  */
589 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
590                                      uint32_t pnn,
591                                      struct ctdb_rec_data_old *r)
592 {
593         struct ctdb_client_call_state *state;
594         TDB_DATA data;
595         struct ctdb_ltdb_header *hdr;
596         struct ctdb_call call;
597
598         ZERO_STRUCT(call);
599         call.call_id = CTDB_NULL_FUNC;
600         call.flags = CTDB_IMMEDIATE_MIGRATION;
601         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
602
603         call.key.dptr = &r->data[0];
604         call.key.dsize = r->keylen;
605
606         /* ensure we don't block this daemon - just skip a record if we can't get
607            the chainlock */
608         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
609                 return true;
610         }
611
612         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
613         if (data.dptr == NULL) {
614                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
615                 return true;
616         }
617
618         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
619                 free(data.dptr);
620                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
621                 return true;
622         }
623
624         hdr = (struct ctdb_ltdb_header *)data.dptr;
625         if (hdr->dmaster == pnn) {
626                 /* its already local */
627                 free(data.dptr);
628                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
629                 return true;
630         }
631
632         free(data.dptr);
633
634         state = ctdb_call_send(ctdb_db, &call);
635         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
636         if (state == NULL) {
637                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
638                 return false;
639         }
640         state->async.fn = vacuum_fetch_callback;
641         state->async.private_data = NULL;
642
643         return true;
644 }
645
646
647 /*
648   handler for vacuum fetch
649 */
650 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
651                                  void *private_data)
652 {
653         struct ctdb_recoverd *rec = talloc_get_type(
654                 private_data, struct ctdb_recoverd);
655         struct ctdb_context *ctdb = rec->ctdb;
656         struct ctdb_marshall_buffer *recs;
657         int ret, i;
658         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
659         const char *name;
660         struct ctdb_dbid_map_old *dbmap=NULL;
661         uint8_t db_flags = 0;
662         struct ctdb_db_context *ctdb_db;
663         struct ctdb_rec_data_old *r;
664
665         recs = (struct ctdb_marshall_buffer *)data.dptr;
666
667         if (recs->count == 0) {
668                 goto done;
669         }
670
671         /* work out if the database is persistent */
672         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
673         if (ret != 0) {
674                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
675                 goto done;
676         }
677
678         for (i=0;i<dbmap->num;i++) {
679                 if (dbmap->dbs[i].db_id == recs->db_id) {
680                         db_flags = dbmap->dbs[i].flags;
681                         break;
682                 }
683         }
684         if (i == dbmap->num) {
685                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
686                 goto done;
687         }
688
689         /* find the name of this database */
690         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
691                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
692                 goto done;
693         }
694
695         /* attach to it */
696         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
697         if (ctdb_db == NULL) {
698                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
699                 goto done;
700         }
701
702         r = (struct ctdb_rec_data_old *)&recs->data[0];
703         while (recs->count) {
704                 bool ok;
705
706                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
707                 if (!ok) {
708                         break;
709                 }
710
711                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
712                 recs->count--;
713         }
714
715 done:
716         talloc_free(tmp_ctx);
717 }
718
719
720 /*
721  * handler for database detach
722  */
723 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
724                                     void *private_data)
725 {
726         struct ctdb_recoverd *rec = talloc_get_type(
727                 private_data, struct ctdb_recoverd);
728         struct ctdb_context *ctdb = rec->ctdb;
729         uint32_t db_id;
730         struct ctdb_db_context *ctdb_db;
731
732         if (data.dsize != sizeof(db_id)) {
733                 return;
734         }
735         db_id = *(uint32_t *)data.dptr;
736
737         ctdb_db = find_ctdb_db(ctdb, db_id);
738         if (ctdb_db == NULL) {
739                 /* database is not attached */
740                 return;
741         }
742
743         DLIST_REMOVE(ctdb->db_list, ctdb_db);
744
745         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
746                              ctdb_db->db_name));
747         talloc_free(ctdb_db);
748 }
749
750 /*
751   called when ctdb_wait_timeout should finish
752  */
753 static void ctdb_wait_handler(struct tevent_context *ev,
754                               struct tevent_timer *te,
755                               struct timeval yt, void *p)
756 {
757         uint32_t *timed_out = (uint32_t *)p;
758         (*timed_out) = 1;
759 }
760
761 /*
762   wait for a given number of seconds
763  */
764 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
765 {
766         uint32_t timed_out = 0;
767         time_t usecs = (secs - (time_t)secs) * 1000000;
768         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
769                          ctdb_wait_handler, &timed_out);
770         while (!timed_out) {
771                 tevent_loop_once(ctdb->ev);
772         }
773 }
774
775 /*
776   called when an election times out (ends)
777  */
778 static void ctdb_election_timeout(struct tevent_context *ev,
779                                   struct tevent_timer *te,
780                                   struct timeval t, void *p)
781 {
782         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
783         rec->election_timeout = NULL;
784         fast_start = false;
785
786         DEBUG(DEBUG_WARNING,("Election period ended\n"));
787 }
788
789
790 /*
791   wait for an election to finish. It finished election_timeout seconds after
792   the last election packet is received
793  */
794 static void ctdb_wait_election(struct ctdb_recoverd *rec)
795 {
796         struct ctdb_context *ctdb = rec->ctdb;
797         while (rec->election_timeout) {
798                 tevent_loop_once(ctdb->ev);
799         }
800 }
801
802 /*
803   Update our local flags from all remote connected nodes. 
804   This is only run when we are or we belive we are the recovery master
805  */
806 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
807 {
808         int j;
809         struct ctdb_context *ctdb = rec->ctdb;
810         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
811
812         /* get the nodemap for all active remote nodes and verify
813            they are the same as for this node
814          */
815         for (j=0; j<nodemap->num; j++) {
816                 struct ctdb_node_map_old *remote_nodemap=NULL;
817                 int ret;
818
819                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
820                         continue;
821                 }
822                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
823                         continue;
824                 }
825
826                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
827                                            mem_ctx, &remote_nodemap);
828                 if (ret != 0) {
829                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
830                                   nodemap->nodes[j].pnn));
831                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
832                         talloc_free(mem_ctx);
833                         return -1;
834                 }
835                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
836                         /* We should tell our daemon about this so it
837                            updates its flags or else we will log the same 
838                            message again in the next iteration of recovery.
839                            Since we are the recovery master we can just as
840                            well update the flags on all nodes.
841                         */
842                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
843                         if (ret != 0) {
844                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
845                                 return -1;
846                         }
847
848                         /* Update our local copy of the flags in the recovery
849                            daemon.
850                         */
851                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
852                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
853                                  nodemap->nodes[j].flags));
854                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
855                 }
856                 talloc_free(remote_nodemap);
857         }
858         talloc_free(mem_ctx);
859         return 0;
860 }
861
862
863 /* Create a new random generation id.
864    The generation id can not be the INVALID_GENERATION id
865 */
866 static uint32_t new_generation(void)
867 {
868         uint32_t generation;
869
870         while (1) {
871                 generation = random();
872
873                 if (generation != INVALID_GENERATION) {
874                         break;
875                 }
876         }
877
878         return generation;
879 }
880
881 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
882 {
883         return (rec->recovery_lock_handle != NULL);
884 }
885
886 struct ctdb_recovery_lock_handle {
887         bool done;
888         bool locked;
889         double latency;
890         struct ctdb_cluster_mutex_handle *h;
891 };
892
893 static void take_reclock_handler(char status,
894                                  double latency,
895                                  void *private_data)
896 {
897         struct ctdb_recovery_lock_handle *s =
898                 (struct ctdb_recovery_lock_handle *) private_data;
899
900         switch (status) {
901         case '0':
902                 s->latency = latency;
903                 break;
904
905         case '1':
906                 DEBUG(DEBUG_ERR,
907                       ("Unable to take recovery lock - contention\n"));
908                 break;
909
910         default:
911                 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
912         }
913
914         s->done = true;
915         s->locked = (status == '0') ;
916 }
917
918 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec);
919
920 static void lost_reclock_handler(void *private_data)
921 {
922         struct ctdb_recoverd *rec = talloc_get_type_abort(
923                 private_data, struct ctdb_recoverd);
924
925         DEBUG(DEBUG_ERR,
926               ("Recovery lock helper terminated unexpectedly - "
927                "trying to retake recovery lock\n"));
928         TALLOC_FREE(rec->recovery_lock_handle);
929         if (! ctdb_recovery_lock(rec)) {
930                 DEBUG(DEBUG_ERR, ("Failed to take recovery lock\n"));
931         }
932 }
933
934 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
935 {
936         struct ctdb_context *ctdb = rec->ctdb;
937         struct ctdb_cluster_mutex_handle *h;
938         struct ctdb_recovery_lock_handle *s;
939
940         s = talloc_zero(rec, struct ctdb_recovery_lock_handle);
941         if (s == NULL) {
942                 DBG_ERR("Memory allocation error\n");
943                 return false;
944         };
945
946         h = ctdb_cluster_mutex(s,
947                                ctdb,
948                                ctdb->recovery_lock,
949                                0,
950                                take_reclock_handler,
951                                s,
952                                lost_reclock_handler,
953                                rec);
954         if (h == NULL) {
955                 talloc_free(s);
956                 return false;
957         }
958
959         rec->recovery_lock_handle = s;
960         s->h = h;
961
962         while (! s->done) {
963                 tevent_loop_once(ctdb->ev);
964         }
965
966         if (! s->locked) {
967                 TALLOC_FREE(rec->recovery_lock_handle);
968                 return false;
969         }
970
971         ctdb_ctrl_report_recd_lock_latency(ctdb,
972                                            CONTROL_TIMEOUT(),
973                                            s->latency);
974
975         return true;
976 }
977
978 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
979 {
980         if (rec->recovery_lock_handle == NULL) {
981                 return;
982         }
983
984         if (! rec->recovery_lock_handle->done) {
985                 /*
986                  * Taking of recovery lock still in progress.  Free
987                  * the cluster mutex handle to release it but leave
988                  * the recovery lock handle in place to allow taking
989                  * of the lock to fail.
990                  */
991                 D_NOTICE("Cancelling recovery lock\n");
992                 TALLOC_FREE(rec->recovery_lock_handle->h);
993                 rec->recovery_lock_handle->done = true;
994                 rec->recovery_lock_handle->locked = false;
995                 return;
996         }
997
998         D_NOTICE("Releasing recovery lock\n");
999         TALLOC_FREE(rec->recovery_lock_handle);
1000 }
1001
1002 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1003 {
1004         struct ctdb_context *ctdb = rec->ctdb;
1005         int i;
1006         struct ctdb_banning_state *ban_state;
1007
1008         *self_ban = false;
1009         for (i=0; i<ctdb->num_nodes; i++) {
1010                 if (ctdb->nodes[i]->ban_state == NULL) {
1011                         continue;
1012                 }
1013                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1014                 if (ban_state->count < 2*ctdb->num_nodes) {
1015                         continue;
1016                 }
1017
1018                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1019                         ctdb->nodes[i]->pnn, ban_state->count,
1020                         ctdb->tunable.recovery_ban_period));
1021                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1022                 ban_state->count = 0;
1023
1024                 /* Banning ourself? */
1025                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1026                         *self_ban = true;
1027                 }
1028         }
1029 }
1030
1031 struct helper_state {
1032         int fd[2];
1033         pid_t pid;
1034         int result;
1035         bool done;
1036 };
1037
1038 static void helper_handler(struct tevent_context *ev,
1039                            struct tevent_fd *fde,
1040                            uint16_t flags, void *private_data)
1041 {
1042         struct helper_state *state = talloc_get_type_abort(
1043                 private_data, struct helper_state);
1044         int ret;
1045
1046         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1047         if (ret != sizeof(state->result)) {
1048                 state->result = EPIPE;
1049         }
1050
1051         state->done = true;
1052 }
1053
1054 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1055                       const char *prog, const char *arg, const char *type)
1056 {
1057         struct helper_state *state;
1058         struct tevent_fd *fde;
1059         const char **args;
1060         int nargs, ret;
1061         uint32_t recmaster = rec->recmaster;
1062
1063         state = talloc_zero(mem_ctx, struct helper_state);
1064         if (state == NULL) {
1065                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1066                 return -1;
1067         }
1068
1069         state->pid = -1;
1070
1071         ret = pipe(state->fd);
1072         if (ret != 0) {
1073                 DEBUG(DEBUG_ERR,
1074                       ("Failed to create pipe for %s helper\n", type));
1075                 goto fail;
1076         }
1077
1078         set_close_on_exec(state->fd[0]);
1079
1080         nargs = 4;
1081         args = talloc_array(state, const char *, nargs);
1082         if (args == NULL) {
1083                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1084                 goto fail;
1085         }
1086
1087         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1088         if (args[0] == NULL) {
1089                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1090                 goto fail;
1091         }
1092         args[1] = rec->ctdb->daemon.name;
1093         args[2] = arg;
1094         args[3] = NULL;
1095
1096         if (args[2] == NULL) {
1097                 nargs = 3;
1098         }
1099
1100         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
1101         if (state->pid == -1) {
1102                 DEBUG(DEBUG_ERR,
1103                       ("Failed to create child for %s helper\n", type));
1104                 goto fail;
1105         }
1106
1107         close(state->fd[1]);
1108         state->fd[1] = -1;
1109
1110         state->done = false;
1111
1112         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1113                             TEVENT_FD_READ, helper_handler, state);
1114         if (fde == NULL) {
1115                 goto fail;
1116         }
1117         tevent_fd_set_auto_close(fde);
1118
1119         while (!state->done) {
1120                 tevent_loop_once(rec->ctdb->ev);
1121
1122                 /* If recmaster changes, we have lost election */
1123                 if (recmaster != rec->recmaster) {
1124                         D_ERR("Recmaster changed to %u, aborting %s\n",
1125                               rec->recmaster, type);
1126                         state->result = 1;
1127                         break;
1128                 }
1129         }
1130
1131         close(state->fd[0]);
1132         state->fd[0] = -1;
1133
1134         if (state->result != 0) {
1135                 goto fail;
1136         }
1137
1138         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1139         talloc_free(state);
1140         return 0;
1141
1142 fail:
1143         if (state->fd[0] != -1) {
1144                 close(state->fd[0]);
1145         }
1146         if (state->fd[1] != -1) {
1147                 close(state->fd[1]);
1148         }
1149         if (state->pid != -1) {
1150                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1151         }
1152         talloc_free(state);
1153         return -1;
1154 }
1155
1156
1157 static int ctdb_takeover(struct ctdb_recoverd *rec,
1158                          uint32_t *force_rebalance_nodes)
1159 {
1160         static char prog[PATH_MAX+1] = "";
1161         char *arg;
1162         int i, ret;
1163
1164         if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
1165                              "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
1166                              "ctdb_takeover_helper")) {
1167                 ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
1168         }
1169
1170         arg = NULL;
1171         for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
1172                 uint32_t pnn = force_rebalance_nodes[i];
1173                 if (arg == NULL) {
1174                         arg = talloc_asprintf(rec, "%u", pnn);
1175                 } else {
1176                         arg = talloc_asprintf_append(arg, ",%u", pnn);
1177                 }
1178                 if (arg == NULL) {
1179                         DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1180                         return -1;
1181                 }
1182         }
1183
1184         if (ctdb_config.failover_disabled) {
1185                 ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
1186                 if (ret != 0) {
1187                         D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
1188                         return -1;
1189                 }
1190         }
1191
1192         return helper_run(rec, rec, prog, arg, "takeover");
1193 }
1194
1195 static bool do_takeover_run(struct ctdb_recoverd *rec,
1196                             struct ctdb_node_map_old *nodemap)
1197 {
1198         uint32_t *nodes = NULL;
1199         struct ctdb_disable_message dtr;
1200         TDB_DATA data;
1201         int i;
1202         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1203         int ret;
1204         bool ok;
1205
1206         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1207
1208         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1209                 DEBUG(DEBUG_ERR, (__location__
1210                                   " takeover run already in progress \n"));
1211                 ok = false;
1212                 goto done;
1213         }
1214
1215         if (!ctdb_op_begin(rec->takeover_run)) {
1216                 ok = false;
1217                 goto done;
1218         }
1219
1220         /* Disable IP checks (takeover runs, really) on other nodes
1221          * while doing this takeover run.  This will stop those other
1222          * nodes from triggering takeover runs when think they should
1223          * be hosting an IP but it isn't yet on an interface.  Don't
1224          * wait for replies since a failure here might cause some
1225          * noise in the logs but will not actually cause a problem.
1226          */
1227         ZERO_STRUCT(dtr);
1228         dtr.srvid = 0; /* No reply */
1229         dtr.pnn = -1;
1230
1231         data.dptr  = (uint8_t*)&dtr;
1232         data.dsize = sizeof(dtr);
1233
1234         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1235
1236         /* Disable for 60 seconds.  This can be a tunable later if
1237          * necessary.
1238          */
1239         dtr.timeout = 60;
1240         for (i = 0; i < talloc_array_length(nodes); i++) {
1241                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1242                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1243                                              data) != 0) {
1244                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1245                 }
1246         }
1247
1248         ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
1249
1250         /* Reenable takeover runs and IP checks on other nodes */
1251         dtr.timeout = 0;
1252         for (i = 0; i < talloc_array_length(nodes); i++) {
1253                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1254                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1255                                              data) != 0) {
1256                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1257                 }
1258         }
1259
1260         if (ret != 0) {
1261                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1262                 ok = false;
1263                 goto done;
1264         }
1265
1266         ok = true;
1267         /* Takeover run was successful so clear force rebalance targets */
1268         if (rebalance_nodes == rec->force_rebalance_nodes) {
1269                 TALLOC_FREE(rec->force_rebalance_nodes);
1270         } else {
1271                 DEBUG(DEBUG_WARNING,
1272                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1273         }
1274 done:
1275         rec->need_takeover_run = !ok;
1276         talloc_free(nodes);
1277         ctdb_op_end(rec->takeover_run);
1278
1279         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1280         return ok;
1281 }
1282
1283 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1284 {
1285         static char prog[PATH_MAX+1] = "";
1286         const char *arg;
1287
1288         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1289                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1290                              "ctdb_recovery_helper")) {
1291                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1292         }
1293
1294         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1295         if (arg == NULL) {
1296                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1297                 return -1;
1298         }
1299
1300         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1301
1302         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1303 }
1304
1305 /*
1306   we are the recmaster, and recovery is needed - start a recovery run
1307  */
1308 static int do_recovery(struct ctdb_recoverd *rec,
1309                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1310                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1311 {
1312         struct ctdb_context *ctdb = rec->ctdb;
1313         int i, ret;
1314         struct ctdb_dbid_map_old *dbmap;
1315         bool self_ban;
1316
1317         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1318
1319         /* Check if the current node is still the recmaster.  It's possible that
1320          * re-election has changed the recmaster.
1321          */
1322         if (pnn != rec->recmaster) {
1323                 DEBUG(DEBUG_NOTICE,
1324                       ("Recovery master changed to %u, aborting recovery\n",
1325                        rec->recmaster));
1326                 return -1;
1327         }
1328
1329         /* if recovery fails, force it again */
1330         rec->need_recovery = true;
1331
1332         if (!ctdb_op_begin(rec->recovery)) {
1333                 return -1;
1334         }
1335
1336         if (rec->election_timeout) {
1337                 /* an election is in progress */
1338                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1339                 goto fail;
1340         }
1341
1342         ban_misbehaving_nodes(rec, &self_ban);
1343         if (self_ban) {
1344                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1345                 goto fail;
1346         }
1347
1348         if (ctdb->recovery_lock != NULL) {
1349                 if (ctdb_recovery_have_lock(rec)) {
1350                         D_NOTICE("Already holding recovery lock\n");
1351                 } else {
1352                         bool ok;
1353
1354                         D_NOTICE("Attempting to take recovery lock (%s)\n",
1355                                  ctdb->recovery_lock);
1356
1357                         ok = ctdb_recovery_lock(rec);
1358                         if (! ok) {
1359                                 D_ERR("Unable to take recovery lock\n");
1360
1361                                 if (pnn != rec->recmaster) {
1362                                         D_NOTICE("Recovery master changed to %u,"
1363                                                  " aborting recovery\n",
1364                                                  rec->recmaster);
1365                                         rec->need_recovery = false;
1366                                         goto fail;
1367                                 }
1368
1369                                 if (ctdb->runstate ==
1370                                     CTDB_RUNSTATE_FIRST_RECOVERY) {
1371                                         /*
1372                                          * First recovery?  Perhaps
1373                                          * current node does not yet
1374                                          * know who the recmaster is.
1375                                          */
1376                                         D_ERR("Retrying recovery\n");
1377                                         goto fail;
1378                                 }
1379
1380                                 D_ERR("Abort recovery, "
1381                                       "ban this node for %u seconds\n",
1382                                       ctdb->tunable.recovery_ban_period);
1383                                 ctdb_ban_node(rec,
1384                                               pnn,
1385                                               ctdb->tunable.recovery_ban_period);
1386                                 goto fail;
1387                         }
1388                         D_NOTICE("Recovery lock taken successfully\n");
1389                 }
1390         }
1391
1392         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1393
1394         /* get a list of all databases */
1395         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1396         if (ret != 0) {
1397                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1398                 goto fail;
1399         }
1400
1401         /* we do the db creation before we set the recovery mode, so the freeze happens
1402            on all databases we will be dealing with. */
1403
1404         /* verify that we have all the databases any other node has */
1405         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1406         if (ret != 0) {
1407                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1408                 goto fail;
1409         }
1410
1411         /* verify that all other nodes have all our databases */
1412         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1413         if (ret != 0) {
1414                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1415                 goto fail;
1416         }
1417         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1418
1419
1420         /* Retrieve capabilities from all connected nodes */
1421         ret = update_capabilities(rec, nodemap);
1422         if (ret!=0) {
1423                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1424                 return -1;
1425         }
1426
1427         /*
1428           update all nodes to have the same flags that we have
1429          */
1430         for (i=0;i<nodemap->num;i++) {
1431                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1432                         continue;
1433                 }
1434
1435                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1436                 if (ret != 0) {
1437                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1438                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1439                         } else {
1440                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1441                                 return -1;
1442                         }
1443                 }
1444         }
1445
1446         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1447
1448         ret = db_recovery_parallel(rec, mem_ctx);
1449         if (ret != 0) {
1450                 goto fail;
1451         }
1452
1453         do_takeover_run(rec, nodemap);
1454
1455         /* send a message to all clients telling them that the cluster 
1456            has been reconfigured */
1457         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1458                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1459         if (ret != 0) {
1460                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1461                 goto fail;
1462         }
1463
1464         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1465
1466         rec->need_recovery = false;
1467         ctdb_op_end(rec->recovery);
1468
1469         /* we managed to complete a full recovery, make sure to forgive
1470            any past sins by the nodes that could now participate in the
1471            recovery.
1472         */
1473         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1474         for (i=0;i<nodemap->num;i++) {
1475                 struct ctdb_banning_state *ban_state;
1476
1477                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1478                         continue;
1479                 }
1480
1481                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1482                 if (ban_state == NULL) {
1483                         continue;
1484                 }
1485
1486                 ban_state->count = 0;
1487         }
1488
1489         /* We just finished a recovery successfully.
1490            We now wait for rerecovery_timeout before we allow
1491            another recovery to take place.
1492         */
1493         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1494         ctdb_op_disable(rec->recovery, ctdb->ev,
1495                         ctdb->tunable.rerecovery_timeout);
1496         return 0;
1497
1498 fail:
1499         ctdb_op_end(rec->recovery);
1500         return -1;
1501 }
1502
1503
1504 /*
1505   elections are won by first checking the number of connected nodes, then
1506   the priority time, then the pnn
1507  */
1508 struct election_message {
1509         uint32_t num_connected;
1510         struct timeval priority_time;
1511         uint32_t pnn;
1512         uint32_t node_flags;
1513 };
1514
1515 /*
1516   form this nodes election data
1517  */
1518 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1519 {
1520         int ret, i;
1521         struct ctdb_node_map_old *nodemap;
1522         struct ctdb_context *ctdb = rec->ctdb;
1523
1524         ZERO_STRUCTP(em);
1525
1526         em->pnn = rec->ctdb->pnn;
1527         em->priority_time = rec->priority_time;
1528
1529         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1530         if (ret != 0) {
1531                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1532                 return;
1533         }
1534
1535         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1536         em->node_flags = rec->node_flags;
1537
1538         for (i=0;i<nodemap->num;i++) {
1539                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1540                         em->num_connected++;
1541                 }
1542         }
1543
1544         /* we shouldnt try to win this election if we cant be a recmaster */
1545         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1546                 em->num_connected = 0;
1547                 em->priority_time = timeval_current();
1548         }
1549
1550         talloc_free(nodemap);
1551 }
1552
1553 /*
1554   see if the given election data wins
1555  */
1556 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1557 {
1558         struct election_message myem;
1559         int cmp = 0;
1560
1561         ctdb_election_data(rec, &myem);
1562
1563         /* we cant win if we don't have the recmaster capability */
1564         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1565                 return false;
1566         }
1567
1568         /* we cant win if we are banned */
1569         if (rec->node_flags & NODE_FLAGS_BANNED) {
1570                 return false;
1571         }
1572
1573         /* we cant win if we are stopped */
1574         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1575                 return false;
1576         }
1577
1578         /* we will automatically win if the other node is banned */
1579         if (em->node_flags & NODE_FLAGS_BANNED) {
1580                 return true;
1581         }
1582
1583         /* we will automatically win if the other node is banned */
1584         if (em->node_flags & NODE_FLAGS_STOPPED) {
1585                 return true;
1586         }
1587
1588         /* then the longest running node */
1589         if (cmp == 0) {
1590                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1591         }
1592
1593         if (cmp == 0) {
1594                 cmp = (int)myem.pnn - (int)em->pnn;
1595         }
1596
1597         return cmp > 0;
1598 }
1599
1600 /*
1601   send out an election request
1602  */
1603 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1604 {
1605         int ret;
1606         TDB_DATA election_data;
1607         struct election_message emsg;
1608         uint64_t srvid;
1609         struct ctdb_context *ctdb = rec->ctdb;
1610
1611         srvid = CTDB_SRVID_ELECTION;
1612
1613         ctdb_election_data(rec, &emsg);
1614
1615         election_data.dsize = sizeof(struct election_message);
1616         election_data.dptr  = (unsigned char *)&emsg;
1617
1618
1619         /* first we assume we will win the election and set 
1620            recoverymaster to be ourself on the current node
1621          */
1622         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1623                                      CTDB_CURRENT_NODE, pnn);
1624         if (ret != 0) {
1625                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1626                 return -1;
1627         }
1628         rec->recmaster = pnn;
1629
1630         /* send an election message to all active nodes */
1631         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1632         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1633 }
1634
1635 /*
1636   we think we are winning the election - send a broadcast election request
1637  */
1638 static void election_send_request(struct tevent_context *ev,
1639                                   struct tevent_timer *te,
1640                                   struct timeval t, void *p)
1641 {
1642         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1643         int ret;
1644
1645         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1646         if (ret != 0) {
1647                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1648         }
1649
1650         TALLOC_FREE(rec->send_election_te);
1651 }
1652
1653 /*
1654   handler for memory dumps
1655 */
1656 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1657 {
1658         struct ctdb_recoverd *rec = talloc_get_type(
1659                 private_data, struct ctdb_recoverd);
1660         struct ctdb_context *ctdb = rec->ctdb;
1661         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1662         TDB_DATA *dump;
1663         int ret;
1664         struct ctdb_srvid_message *rd;
1665
1666         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1667                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1668                 talloc_free(tmp_ctx);
1669                 return;
1670         }
1671         rd = (struct ctdb_srvid_message *)data.dptr;
1672
1673         dump = talloc_zero(tmp_ctx, TDB_DATA);
1674         if (dump == NULL) {
1675                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1676                 talloc_free(tmp_ctx);
1677                 return;
1678         }
1679         ret = ctdb_dump_memory(ctdb, dump);
1680         if (ret != 0) {
1681                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1682                 talloc_free(tmp_ctx);
1683                 return;
1684         }
1685
1686 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1687
1688         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1689         if (ret != 0) {
1690                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1691                 talloc_free(tmp_ctx);
1692                 return;
1693         }
1694
1695         talloc_free(tmp_ctx);
1696 }
1697
1698 /*
1699   handler for reload_nodes
1700 */
1701 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1702                                  void *private_data)
1703 {
1704         struct ctdb_recoverd *rec = talloc_get_type(
1705                 private_data, struct ctdb_recoverd);
1706
1707         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1708
1709         ctdb_load_nodes_file(rec->ctdb);
1710 }
1711
1712
1713 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1714                                         void *private_data)
1715 {
1716         struct ctdb_recoverd *rec = talloc_get_type(
1717                 private_data, struct ctdb_recoverd);
1718         struct ctdb_context *ctdb = rec->ctdb;
1719         uint32_t pnn;
1720         uint32_t *t;
1721         int len;
1722
1723         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1724                 return;
1725         }
1726
1727         if (data.dsize != sizeof(uint32_t)) {
1728                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1729                 return;
1730         }
1731
1732         pnn = *(uint32_t *)&data.dptr[0];
1733
1734         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1735
1736         /* Copy any existing list of nodes.  There's probably some
1737          * sort of realloc variant that will do this but we need to
1738          * make sure that freeing the old array also cancels the timer
1739          * event for the timeout... not sure if realloc will do that.
1740          */
1741         len = (rec->force_rebalance_nodes != NULL) ?
1742                 talloc_array_length(rec->force_rebalance_nodes) :
1743                 0;
1744
1745         /* This allows duplicates to be added but they don't cause
1746          * harm.  A call to add a duplicate PNN arguably means that
1747          * the timeout should be reset, so this is the simplest
1748          * solution.
1749          */
1750         t = talloc_zero_array(rec, uint32_t, len+1);
1751         CTDB_NO_MEMORY_VOID(ctdb, t);
1752         if (len > 0) {
1753                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1754         }
1755         t[len] = pnn;
1756
1757         talloc_free(rec->force_rebalance_nodes);
1758
1759         rec->force_rebalance_nodes = t;
1760 }
1761
1762
1763
1764 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1765                                     TDB_DATA data,
1766                                     struct ctdb_op_state *op_state)
1767 {
1768         struct ctdb_disable_message *r;
1769         uint32_t timeout;
1770         TDB_DATA result;
1771         int32_t ret = 0;
1772
1773         /* Validate input data */
1774         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1775                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1776                                  "expecting %lu\n", (long unsigned)data.dsize,
1777                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1778                 return;
1779         }
1780         if (data.dptr == NULL) {
1781                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1782                 return;
1783         }
1784
1785         r = (struct ctdb_disable_message *)data.dptr;
1786         timeout = r->timeout;
1787
1788         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1789         if (ret != 0) {
1790                 goto done;
1791         }
1792
1793         /* Returning our PNN tells the caller that we succeeded */
1794         ret = ctdb_get_pnn(ctdb);
1795 done:
1796         result.dsize = sizeof(int32_t);
1797         result.dptr  = (uint8_t *)&ret;
1798         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1799 }
1800
1801 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1802                                           void *private_data)
1803 {
1804         struct ctdb_recoverd *rec = talloc_get_type(
1805                 private_data, struct ctdb_recoverd);
1806
1807         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1808 }
1809
1810 /* Backward compatibility for this SRVID */
1811 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1812                                      void *private_data)
1813 {
1814         struct ctdb_recoverd *rec = talloc_get_type(
1815                 private_data, struct ctdb_recoverd);
1816         uint32_t timeout;
1817
1818         if (data.dsize != sizeof(uint32_t)) {
1819                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1820                                  "expecting %lu\n", (long unsigned)data.dsize,
1821                                  (long unsigned)sizeof(uint32_t)));
1822                 return;
1823         }
1824         if (data.dptr == NULL) {
1825                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1826                 return;
1827         }
1828
1829         timeout = *((uint32_t *)data.dptr);
1830
1831         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1832 }
1833
1834 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1835                                        void *private_data)
1836 {
1837         struct ctdb_recoverd *rec = talloc_get_type(
1838                 private_data, struct ctdb_recoverd);
1839
1840         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1841 }
1842
1843 /*
1844   handler for ip reallocate, just add it to the list of requests and 
1845   handle this later in the monitor_cluster loop so we do not recurse
1846   with other requests to takeover_run()
1847 */
1848 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1849                                   void *private_data)
1850 {
1851         struct ctdb_srvid_message *request;
1852         struct ctdb_recoverd *rec = talloc_get_type(
1853                 private_data, struct ctdb_recoverd);
1854
1855         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1856                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1857                 return;
1858         }
1859
1860         request = (struct ctdb_srvid_message *)data.dptr;
1861
1862         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1863 }
1864
1865 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1866                                           struct ctdb_recoverd *rec)
1867 {
1868         TDB_DATA result;
1869         int32_t ret;
1870         struct srvid_requests *current;
1871
1872         /* Only process requests that are currently pending.  More
1873          * might come in while the takeover run is in progress and
1874          * they will need to be processed later since they might
1875          * be in response flag changes.
1876          */
1877         current = rec->reallocate_requests;
1878         rec->reallocate_requests = NULL;
1879
1880         if (do_takeover_run(rec, rec->nodemap)) {
1881                 ret = ctdb_get_pnn(ctdb);
1882         } else {
1883                 ret = -1;
1884         }
1885
1886         result.dsize = sizeof(int32_t);
1887         result.dptr  = (uint8_t *)&ret;
1888
1889         srvid_requests_reply(ctdb, &current, result);
1890 }
1891
1892 /*
1893  * handler for assigning banning credits
1894  */
1895 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1896 {
1897         struct ctdb_recoverd *rec = talloc_get_type(
1898                 private_data, struct ctdb_recoverd);
1899         uint32_t ban_pnn;
1900
1901         /* Ignore if we are not recmaster */
1902         if (rec->ctdb->pnn != rec->recmaster) {
1903                 return;
1904         }
1905
1906         if (data.dsize != sizeof(uint32_t)) {
1907                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1908                                   data.dsize));
1909                 return;
1910         }
1911
1912         ban_pnn = *(uint32_t *)data.dptr;
1913
1914         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1915 }
1916
1917 /*
1918   handler for recovery master elections
1919 */
1920 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1921 {
1922         struct ctdb_recoverd *rec = talloc_get_type(
1923                 private_data, struct ctdb_recoverd);
1924         struct ctdb_context *ctdb = rec->ctdb;
1925         int ret;
1926         struct election_message *em = (struct election_message *)data.dptr;
1927
1928         /* Ignore election packets from ourself */
1929         if (ctdb->pnn == em->pnn) {
1930                 return;
1931         }
1932
1933         /* we got an election packet - update the timeout for the election */
1934         talloc_free(rec->election_timeout);
1935         rec->election_timeout = tevent_add_timer(
1936                         ctdb->ev, ctdb,
1937                         fast_start ?
1938                                 timeval_current_ofs(0, 500000) :
1939                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1940                         ctdb_election_timeout, rec);
1941
1942         /* someone called an election. check their election data
1943            and if we disagree and we would rather be the elected node, 
1944            send a new election message to all other nodes
1945          */
1946         if (ctdb_election_win(rec, em)) {
1947                 if (!rec->send_election_te) {
1948                         rec->send_election_te = tevent_add_timer(
1949                                         ctdb->ev, rec,
1950                                         timeval_current_ofs(0, 500000),
1951                                         election_send_request, rec);
1952                 }
1953                 return;
1954         }
1955
1956         /* we didn't win */
1957         TALLOC_FREE(rec->send_election_te);
1958
1959         /* Release the recovery lock file */
1960         if (ctdb_recovery_have_lock(rec)) {
1961                 ctdb_recovery_unlock(rec);
1962         }
1963
1964         /* ok, let that guy become recmaster then */
1965         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1966                                      CTDB_CURRENT_NODE, em->pnn);
1967         if (ret != 0) {
1968                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1969                 return;
1970         }
1971         rec->recmaster = em->pnn;
1972
1973         return;
1974 }
1975
1976
1977 /*
1978   force the start of the election process
1979  */
1980 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1981                            struct ctdb_node_map_old *nodemap)
1982 {
1983         int ret;
1984         struct ctdb_context *ctdb = rec->ctdb;
1985
1986         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1987
1988         /* set all nodes to recovery mode to stop all internode traffic */
1989         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1990         if (ret != 0) {
1991                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1992                 return;
1993         }
1994
1995         talloc_free(rec->election_timeout);
1996         rec->election_timeout = tevent_add_timer(
1997                         ctdb->ev, ctdb,
1998                         fast_start ?
1999                                 timeval_current_ofs(0, 500000) :
2000                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2001                         ctdb_election_timeout, rec);
2002
2003         ret = send_election_request(rec, pnn);
2004         if (ret!=0) {
2005                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2006                 return;
2007         }
2008
2009         /* wait for a few seconds to collect all responses */
2010         ctdb_wait_election(rec);
2011 }
2012
2013
2014
2015 /*
2016   handler for when a node changes its flags
2017 */
2018 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2019 {
2020         struct ctdb_recoverd *rec = talloc_get_type(
2021                 private_data, struct ctdb_recoverd);
2022         struct ctdb_context *ctdb = rec->ctdb;
2023         int ret;
2024         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2025         struct ctdb_node_map_old *nodemap=NULL;
2026         TALLOC_CTX *tmp_ctx;
2027         int i;
2028
2029         if (data.dsize != sizeof(*c)) {
2030                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2031                 return;
2032         }
2033
2034         tmp_ctx = talloc_new(ctdb);
2035         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2036
2037         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2038         if (ret != 0) {
2039                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2040                 talloc_free(tmp_ctx);
2041                 return;         
2042         }
2043
2044
2045         for (i=0;i<nodemap->num;i++) {
2046                 if (nodemap->nodes[i].pnn == c->pnn) break;
2047         }
2048
2049         if (i == nodemap->num) {
2050                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2051                 talloc_free(tmp_ctx);
2052                 return;
2053         }
2054
2055         if (c->old_flags != c->new_flags) {
2056                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2057         }
2058
2059         nodemap->nodes[i].flags = c->new_flags;
2060
2061         talloc_free(tmp_ctx);
2062 }
2063
2064 /*
2065   handler for when we need to push out flag changes ot all other nodes
2066 */
2067 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2068                                void *private_data)
2069 {
2070         struct ctdb_recoverd *rec = talloc_get_type(
2071                 private_data, struct ctdb_recoverd);
2072         struct ctdb_context *ctdb = rec->ctdb;
2073         int ret;
2074         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2075         struct ctdb_node_map_old *nodemap=NULL;
2076         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2077         uint32_t *nodes;
2078
2079         /* read the node flags from the recmaster */
2080         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2081                                    tmp_ctx, &nodemap);
2082         if (ret != 0) {
2083                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2084                 talloc_free(tmp_ctx);
2085                 return;
2086         }
2087         if (c->pnn >= nodemap->num) {
2088                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2089                 talloc_free(tmp_ctx);
2090                 return;
2091         }
2092
2093         /* send the flags update to all connected nodes */
2094         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2095
2096         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2097                                       nodes, 0, CONTROL_TIMEOUT(),
2098                                       false, data,
2099                                       NULL, NULL,
2100                                       NULL) != 0) {
2101                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2102
2103                 talloc_free(tmp_ctx);
2104                 return;
2105         }
2106
2107         talloc_free(tmp_ctx);
2108 }
2109
2110
2111 struct verify_recmode_normal_data {
2112         uint32_t count;
2113         enum monitor_result status;
2114 };
2115
2116 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2117 {
2118         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2119
2120
2121         /* one more node has responded with recmode data*/
2122         rmdata->count--;
2123
2124         /* if we failed to get the recmode, then return an error and let
2125            the main loop try again.
2126         */
2127         if (state->state != CTDB_CONTROL_DONE) {
2128                 if (rmdata->status == MONITOR_OK) {
2129                         rmdata->status = MONITOR_FAILED;
2130                 }
2131                 return;
2132         }
2133
2134         /* if we got a response, then the recmode will be stored in the
2135            status field
2136         */
2137         if (state->status != CTDB_RECOVERY_NORMAL) {
2138                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2139                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2140         }
2141
2142         return;
2143 }
2144
2145
2146 /* verify that all nodes are in normal recovery mode */
2147 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2148 {
2149         struct verify_recmode_normal_data *rmdata;
2150         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2151         struct ctdb_client_control_state *state;
2152         enum monitor_result status;
2153         int j;
2154         
2155         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2156         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2157         rmdata->count  = 0;
2158         rmdata->status = MONITOR_OK;
2159
2160         /* loop over all active nodes and send an async getrecmode call to 
2161            them*/
2162         for (j=0; j<nodemap->num; j++) {
2163                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2164                         continue;
2165                 }
2166                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2167                                         CONTROL_TIMEOUT(), 
2168                                         nodemap->nodes[j].pnn);
2169                 if (state == NULL) {
2170                         /* we failed to send the control, treat this as 
2171                            an error and try again next iteration
2172                         */                      
2173                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2174                         talloc_free(mem_ctx);
2175                         return MONITOR_FAILED;
2176                 }
2177
2178                 /* set up the callback functions */
2179                 state->async.fn = verify_recmode_normal_callback;
2180                 state->async.private_data = rmdata;
2181
2182                 /* one more control to wait for to complete */
2183                 rmdata->count++;
2184         }
2185
2186
2187         /* now wait for up to the maximum number of seconds allowed
2188            or until all nodes we expect a response from has replied
2189         */
2190         while (rmdata->count > 0) {
2191                 tevent_loop_once(ctdb->ev);
2192         }
2193
2194         status = rmdata->status;
2195         talloc_free(mem_ctx);
2196         return status;
2197 }
2198
2199
2200 struct verify_recmaster_data {
2201         struct ctdb_recoverd *rec;
2202         uint32_t count;
2203         uint32_t pnn;
2204         enum monitor_result status;
2205 };
2206
2207 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2208 {
2209         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2210
2211
2212         /* one more node has responded with recmaster data*/
2213         rmdata->count--;
2214
2215         /* if we failed to get the recmaster, then return an error and let
2216            the main loop try again.
2217         */
2218         if (state->state != CTDB_CONTROL_DONE) {
2219                 if (rmdata->status == MONITOR_OK) {
2220                         rmdata->status = MONITOR_FAILED;
2221                 }
2222                 return;
2223         }
2224
2225         /* if we got a response, then the recmaster will be stored in the
2226            status field
2227         */
2228         if (state->status != rmdata->pnn) {
2229                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2230                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2231                 rmdata->status = MONITOR_ELECTION_NEEDED;
2232         }
2233
2234         return;
2235 }
2236
2237
2238 /* verify that all nodes agree that we are the recmaster */
2239 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
2240 {
2241         struct ctdb_context *ctdb = rec->ctdb;
2242         struct verify_recmaster_data *rmdata;
2243         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2244         struct ctdb_client_control_state *state;
2245         enum monitor_result status;
2246         int j;
2247         
2248         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2249         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2250         rmdata->rec    = rec;
2251         rmdata->count  = 0;
2252         rmdata->pnn    = pnn;
2253         rmdata->status = MONITOR_OK;
2254
2255         /* loop over all active nodes and send an async getrecmaster call to
2256            them*/
2257         for (j=0; j<nodemap->num; j++) {
2258                 if (nodemap->nodes[j].pnn == rec->recmaster) {
2259                         continue;
2260                 }
2261                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2262                         continue;
2263                 }
2264                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2265                                         CONTROL_TIMEOUT(),
2266                                         nodemap->nodes[j].pnn);
2267                 if (state == NULL) {
2268                         /* we failed to send the control, treat this as 
2269                            an error and try again next iteration
2270                         */                      
2271                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2272                         talloc_free(mem_ctx);
2273                         return MONITOR_FAILED;
2274                 }
2275
2276                 /* set up the callback functions */
2277                 state->async.fn = verify_recmaster_callback;
2278                 state->async.private_data = rmdata;
2279
2280                 /* one more control to wait for to complete */
2281                 rmdata->count++;
2282         }
2283
2284
2285         /* now wait for up to the maximum number of seconds allowed
2286            or until all nodes we expect a response from has replied
2287         */
2288         while (rmdata->count > 0) {
2289                 tevent_loop_once(ctdb->ev);
2290         }
2291
2292         status = rmdata->status;
2293         talloc_free(mem_ctx);
2294         return status;
2295 }
2296
2297 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2298                                     struct ctdb_recoverd *rec)
2299 {
2300         struct ctdb_iface_list_old *ifaces = NULL;
2301         TALLOC_CTX *mem_ctx;
2302         bool ret = false;
2303
2304         mem_ctx = talloc_new(NULL);
2305
2306         /* Read the interfaces from the local node */
2307         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2308                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2309                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2310                 /* We could return an error.  However, this will be
2311                  * rare so we'll decide that the interfaces have
2312                  * actually changed, just in case.
2313                  */
2314                 talloc_free(mem_ctx);
2315                 return true;
2316         }
2317
2318         if (!rec->ifaces) {
2319                 /* We haven't been here before so things have changed */
2320                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2321                 ret = true;
2322         } else if (rec->ifaces->num != ifaces->num) {
2323                 /* Number of interfaces has changed */
2324                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2325                                      rec->ifaces->num, ifaces->num));
2326                 ret = true;
2327         } else {
2328                 /* See if interface names or link states have changed */
2329                 int i;
2330                 for (i = 0; i < rec->ifaces->num; i++) {
2331                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2332                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2333                                 DEBUG(DEBUG_NOTICE,
2334                                       ("Interface in slot %d changed: %s => %s\n",
2335                                        i, iface->name, ifaces->ifaces[i].name));
2336                                 ret = true;
2337                                 break;
2338                         }
2339                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2340                                 DEBUG(DEBUG_NOTICE,
2341                                       ("Interface %s changed state: %d => %d\n",
2342                                        iface->name, iface->link_state,
2343                                        ifaces->ifaces[i].link_state));
2344                                 ret = true;
2345                                 break;
2346                         }
2347                 }
2348         }
2349
2350         talloc_free(rec->ifaces);
2351         rec->ifaces = talloc_steal(rec, ifaces);
2352
2353         talloc_free(mem_ctx);
2354         return ret;
2355 }
2356
2357 /* Check that the local allocation of public IP addresses is correct
2358  * and do some house-keeping */
2359 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2360                                       struct ctdb_recoverd *rec,
2361                                       uint32_t pnn,
2362                                       struct ctdb_node_map_old *nodemap)
2363 {
2364         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2365         int ret, j;
2366         bool need_takeover_run = false;
2367         struct ctdb_public_ip_list_old *ips = NULL;
2368
2369         /* If we are not the recmaster then do some housekeeping */
2370         if (rec->recmaster != pnn) {
2371                 /* Ignore any IP reallocate requests - only recmaster
2372                  * processes them
2373                  */
2374                 TALLOC_FREE(rec->reallocate_requests);
2375                 /* Clear any nodes that should be force rebalanced in
2376                  * the next takeover run.  If the recovery master role
2377                  * has moved then we don't want to process these some
2378                  * time in the future.
2379                  */
2380                 TALLOC_FREE(rec->force_rebalance_nodes);
2381         }
2382
2383         /* Return early if disabled... */
2384         if (ctdb_config.failover_disabled ||
2385             ctdb_op_is_disabled(rec->takeover_run)) {
2386                 return  0;
2387         }
2388
2389         if (interfaces_have_changed(ctdb, rec)) {
2390                 need_takeover_run = true;
2391         }
2392
2393         /* If there are unhosted IPs but this node can host them then
2394          * trigger an IP reallocation */
2395
2396         /* Read *available* IPs from local node */
2397         ret = ctdb_ctrl_get_public_ips_flags(
2398                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2399                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2400         if (ret != 0) {
2401                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2402                 talloc_free(mem_ctx);
2403                 return -1;
2404         }
2405
2406         for (j=0; j<ips->num; j++) {
2407                 if (ips->ips[j].pnn == -1 &&
2408                     nodemap->nodes[pnn].flags == 0) {
2409                         DEBUG(DEBUG_WARNING,
2410                               ("Unassigned IP %s can be served by this node\n",
2411                                ctdb_addr_to_str(&ips->ips[j].addr)));
2412                         need_takeover_run = true;
2413                 }
2414         }
2415
2416         talloc_free(ips);
2417
2418         if (!ctdb->do_checkpublicip) {
2419                 goto done;
2420         }
2421
2422         /* Validate the IP addresses that this node has on network
2423          * interfaces.  If there is an inconsistency between reality
2424          * and the state expected by CTDB then try to fix it by
2425          * triggering an IP reallocation or releasing extraneous IP
2426          * addresses. */
2427
2428         /* Read *known* IPs from local node */
2429         ret = ctdb_ctrl_get_public_ips_flags(
2430                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2431         if (ret != 0) {
2432                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2433                 talloc_free(mem_ctx);
2434                 return -1;
2435         }
2436
2437         for (j=0; j<ips->num; j++) {
2438                 if (ips->ips[j].pnn == pnn) {
2439                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2440                                 DEBUG(DEBUG_ERR,
2441                                       ("Assigned IP %s not on an interface\n",
2442                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2443                                 need_takeover_run = true;
2444                         }
2445                 } else {
2446                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2447                                 DEBUG(DEBUG_ERR,
2448                                       ("IP %s incorrectly on an interface\n",
2449                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2450                                 need_takeover_run = true;
2451                         }
2452                 }
2453         }
2454
2455 done:
2456         if (need_takeover_run) {
2457                 struct ctdb_srvid_message rd;
2458                 TDB_DATA data;
2459
2460                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2461
2462                 ZERO_STRUCT(rd);
2463                 rd.pnn = ctdb->pnn;
2464                 rd.srvid = 0;
2465                 data.dptr = (uint8_t *)&rd;
2466                 data.dsize = sizeof(rd);
2467
2468                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2469                 if (ret != 0) {
2470                         DEBUG(DEBUG_ERR,
2471                               ("Failed to send takeover run request\n"));
2472                 }
2473         }
2474         talloc_free(mem_ctx);
2475         return 0;
2476 }
2477
2478
2479 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2480 {
2481         struct ctdb_node_map_old **remote_nodemaps = callback_data;
2482
2483         if (node_pnn >= ctdb->num_nodes) {
2484                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2485                 return;
2486         }
2487
2488         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
2489
2490 }
2491
2492 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2493         struct ctdb_node_map_old *nodemap,
2494         struct ctdb_node_map_old **remote_nodemaps)
2495 {
2496         uint32_t *nodes;
2497
2498         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2499         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2500                                         nodes, 0,
2501                                         CONTROL_TIMEOUT(), false, tdb_null,
2502                                         async_getnodemap_callback,
2503                                         NULL,
2504                                         remote_nodemaps) != 0) {
2505                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2506
2507                 return -1;
2508         }
2509
2510         return 0;
2511 }
2512
2513 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2514                                      TALLOC_CTX *mem_ctx)
2515 {
2516         struct ctdb_context *ctdb = rec->ctdb;
2517         uint32_t pnn = ctdb_get_pnn(ctdb);
2518         struct ctdb_node_map_old *nodemap = rec->nodemap;
2519         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2520         int ret;
2521
2522         /* When recovery daemon is started, recmaster is set to
2523          * "unknown" so it knows to start an election.
2524          */
2525         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2526                 DEBUG(DEBUG_NOTICE,
2527                       ("Initial recovery master set - forcing election\n"));
2528                 force_election(rec, pnn, nodemap);
2529                 return false;
2530         }
2531
2532         /*
2533          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2534          * but we have, then force an election and try to become the new
2535          * recmaster.
2536          */
2537         if (!ctdb_node_has_capabilities(rec->caps,
2538                                         rec->recmaster,
2539                                         CTDB_CAP_RECMASTER) &&
2540             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2541             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2542                 DEBUG(DEBUG_ERR,
2543                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2544                        " but we (node %u) have - force an election\n",
2545                        rec->recmaster, pnn));
2546                 force_election(rec, pnn, nodemap);
2547                 return false;
2548         }
2549
2550         /* Verify that the master node has not been deleted.  This
2551          * should not happen because a node should always be shutdown
2552          * before being deleted, causing a new master to be elected
2553          * before now.  However, if something strange has happened
2554          * then checking here will ensure we don't index beyond the
2555          * end of the nodemap array. */
2556         if (rec->recmaster >= nodemap->num) {
2557                 DEBUG(DEBUG_ERR,
2558                       ("Recmaster node %u has been deleted. Force election\n",
2559                        rec->recmaster));
2560                 force_election(rec, pnn, nodemap);
2561                 return false;
2562         }
2563
2564         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2565         if (nodemap->nodes[rec->recmaster].flags &
2566             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2567                 DEBUG(DEBUG_NOTICE,
2568                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2569                        rec->recmaster));
2570                 force_election(rec, pnn, nodemap);
2571                 return false;
2572         }
2573
2574         /* get nodemap from the recovery master to check if it is inactive */
2575         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2576                                    mem_ctx, &recmaster_nodemap);
2577         if (ret != 0) {
2578                 DEBUG(DEBUG_ERR,
2579                       (__location__
2580                        " Unable to get nodemap from recovery master %u\n",
2581                           rec->recmaster));
2582                 /* No election, just error */
2583                 return false;
2584         }
2585
2586
2587         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2588             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2589                 DEBUG(DEBUG_NOTICE,
2590                       ("Recmaster node %u is inactive. Force election\n",
2591                        rec->recmaster));
2592                 /*
2593                  * update our nodemap to carry the recmaster's notion of
2594                  * its own flags, so that we don't keep freezing the
2595                  * inactive recmaster node...
2596                  */
2597                 nodemap->nodes[rec->recmaster].flags =
2598                         recmaster_nodemap->nodes[rec->recmaster].flags;
2599                 force_election(rec, pnn, nodemap);
2600                 return false;
2601         }
2602
2603         return true;
2604 }
2605
2606 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2607                       TALLOC_CTX *mem_ctx)
2608 {
2609         uint32_t pnn;
2610         struct ctdb_node_map_old *nodemap=NULL;
2611         struct ctdb_node_map_old **remote_nodemaps=NULL;
2612         struct ctdb_vnn_map *vnnmap=NULL;
2613         struct ctdb_vnn_map *remote_vnnmap=NULL;
2614         uint32_t num_lmasters;
2615         int32_t debug_level;
2616         int i, j, ret;
2617         bool self_ban;
2618
2619
2620         /* verify that the main daemon is still running */
2621         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2622                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2623                 exit(-1);
2624         }
2625
2626         /* ping the local daemon to tell it we are alive */
2627         ctdb_ctrl_recd_ping(ctdb);
2628
2629         if (rec->election_timeout) {
2630                 /* an election is in progress */
2631                 return;
2632         }
2633
2634         /* read the debug level from the parent and update locally */
2635         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2636         if (ret !=0) {
2637                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2638                 return;
2639         }
2640         debuglevel_set(debug_level);
2641
2642         /* get relevant tunables */
2643         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2644         if (ret != 0) {
2645                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2646                 return;
2647         }
2648
2649         /* get runstate */
2650         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2651                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2652         if (ret != 0) {
2653                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2654                 return;
2655         }
2656
2657         pnn = ctdb_get_pnn(ctdb);
2658
2659         /* get nodemap */
2660         TALLOC_FREE(rec->nodemap);
2661         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2662         if (ret != 0) {
2663                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2664                 return;
2665         }
2666         nodemap = rec->nodemap;
2667
2668         /* remember our own node flags */
2669         rec->node_flags = nodemap->nodes[pnn].flags;
2670
2671         ban_misbehaving_nodes(rec, &self_ban);
2672         if (self_ban) {
2673                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2674                 return;
2675         }
2676
2677         ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2678                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2679         if (ret != 0) {
2680                 D_ERR("Failed to read recmode from local node\n");
2681                 return;
2682         }
2683
2684         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2685            also frozen and that the recmode is set to active.
2686         */
2687         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2688                 /* If this node has become inactive then we want to
2689                  * reduce the chances of it taking over the recovery
2690                  * master role when it becomes active again.  This
2691                  * helps to stabilise the recovery master role so that
2692                  * it stays on the most stable node.
2693                  */
2694                 rec->priority_time = timeval_current();
2695
2696                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2697                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2698
2699                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2700                         if (ret != 0) {
2701                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2702
2703                                 return;
2704                         }
2705                 }
2706                 if (! rec->frozen_on_inactive) {
2707                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2708                                                CTDB_CURRENT_NODE);
2709                         if (ret != 0) {
2710                                 DEBUG(DEBUG_ERR,
2711                                       (__location__ " Failed to freeze node "
2712                                        "in STOPPED or BANNED state\n"));
2713                                 return;
2714                         }
2715
2716                         rec->frozen_on_inactive = true;
2717                 }
2718
2719                 /* If this node is stopped or banned then it is not the recovery
2720                  * master, so don't do anything. This prevents stopped or banned
2721                  * node from starting election and sending unnecessary controls.
2722                  */
2723                 return;
2724         }
2725
2726         rec->frozen_on_inactive = false;
2727
2728         /* Retrieve capabilities from all connected nodes */
2729         ret = update_capabilities(rec, nodemap);
2730         if (ret != 0) {
2731                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2732                 return;
2733         }
2734
2735         if (! validate_recovery_master(rec, mem_ctx)) {
2736                 return;
2737         }
2738
2739         if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2740                 /* Check if an IP takeover run is needed and trigger one if
2741                  * necessary */
2742                 verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2743         }
2744
2745         /* if we are not the recmaster then we do not need to check
2746            if recovery is needed
2747          */
2748         if (pnn != rec->recmaster) {
2749                 return;
2750         }
2751
2752
2753         /* ensure our local copies of flags are right */
2754         ret = update_local_flags(rec, nodemap);
2755         if (ret != 0) {
2756                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2757                 return;
2758         }
2759
2760         if (ctdb->num_nodes != nodemap->num) {
2761                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2762                 ctdb_load_nodes_file(ctdb);
2763                 return;
2764         }
2765
2766         /* verify that all active nodes agree that we are the recmaster */
2767         switch (verify_recmaster(rec, nodemap, pnn)) {
2768         case MONITOR_RECOVERY_NEEDED:
2769                 /* can not happen */
2770                 return;
2771         case MONITOR_ELECTION_NEEDED:
2772                 force_election(rec, pnn, nodemap);
2773                 return;
2774         case MONITOR_OK:
2775                 break;
2776         case MONITOR_FAILED:
2777                 return;
2778         }
2779
2780
2781         /* get the vnnmap */
2782         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2783         if (ret != 0) {
2784                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2785                 return;
2786         }
2787
2788         if (rec->need_recovery) {
2789                 /* a previous recovery didn't finish */
2790                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2791                 return;
2792         }
2793
2794         /* verify that all active nodes are in normal mode 
2795            and not in recovery mode 
2796         */
2797         switch (verify_recmode(ctdb, nodemap)) {
2798         case MONITOR_RECOVERY_NEEDED:
2799                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2800                 return;
2801         case MONITOR_FAILED:
2802                 return;
2803         case MONITOR_ELECTION_NEEDED:
2804                 /* can not happen */
2805         case MONITOR_OK:
2806                 break;
2807         }
2808
2809
2810         if (ctdb->recovery_lock != NULL) {
2811                 /* We must already hold the recovery lock */
2812                 if (!ctdb_recovery_have_lock(rec)) {
2813                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2814                         ctdb_set_culprit(rec, ctdb->pnn);
2815                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2816                         return;
2817                 }
2818         }
2819
2820
2821         /* If recoveries are disabled then there is no use doing any
2822          * nodemap or flags checks.  Recoveries might be disabled due
2823          * to "reloadnodes", so doing these checks might cause an
2824          * unnecessary recovery.  */
2825         if (ctdb_op_is_disabled(rec->recovery)) {
2826                 goto takeover_run_checks;
2827         }
2828
2829         /* get the nodemap for all active remote nodes
2830          */
2831         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
2832         if (remote_nodemaps == NULL) {
2833                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2834                 return;
2835         }
2836         for(i=0; i<nodemap->num; i++) {
2837                 remote_nodemaps[i] = NULL;
2838         }
2839         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2840                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2841                 return;
2842         } 
2843
2844         /* verify that all other nodes have the same nodemap as we have
2845         */
2846         for (j=0; j<nodemap->num; j++) {
2847                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2848                         continue;
2849                 }
2850
2851                 if (remote_nodemaps[j] == NULL) {
2852                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2853                         ctdb_set_culprit(rec, j);
2854
2855                         return;
2856                 }
2857
2858                 /* if the nodes disagree on how many nodes there are
2859                    then this is a good reason to try recovery
2860                  */
2861                 if (remote_nodemaps[j]->num != nodemap->num) {
2862                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2863                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2864                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2865                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2866                         return;
2867                 }
2868
2869                 /* if the nodes disagree on which nodes exist and are
2870                    active, then that is also a good reason to do recovery
2871                  */
2872                 for (i=0;i<nodemap->num;i++) {
2873                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2874                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2875                                           nodemap->nodes[j].pnn, i, 
2876                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2877                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2878                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2879                                             vnnmap);
2880                                 return;
2881                         }
2882                 }
2883         }
2884
2885         /*
2886          * Update node flags obtained from each active node. This ensure we have
2887          * up-to-date information for all the nodes.
2888          */
2889         for (j=0; j<nodemap->num; j++) {
2890                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2891                         continue;
2892                 }
2893                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2894         }
2895
2896         for (j=0; j<nodemap->num; j++) {
2897                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2898                         continue;
2899                 }
2900
2901                 /* verify the flags are consistent
2902                 */
2903                 for (i=0; i<nodemap->num; i++) {
2904                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2905                                 continue;
2906                         }
2907                         
2908                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2909                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2910                                   nodemap->nodes[j].pnn, 
2911                                   nodemap->nodes[i].pnn, 
2912                                   remote_nodemaps[j]->nodes[i].flags,
2913                                   nodemap->nodes[i].flags));
2914                                 if (i == j) {
2915                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2916                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2917                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2918                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2919                                                     vnnmap);
2920                                         return;
2921                                 } else {
2922                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2923                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2924                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2925                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2926                                                     vnnmap);
2927                                         return;
2928                                 }
2929                         }
2930                 }
2931         }
2932
2933
2934         /* count how many active nodes there are */
2935         num_lmasters  = 0;
2936         for (i=0; i<nodemap->num; i++) {
2937                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2938                         if (ctdb_node_has_capabilities(rec->caps,
2939                                                        ctdb->nodes[i]->pnn,
2940                                                        CTDB_CAP_LMASTER)) {
2941                                 num_lmasters++;
2942                         }
2943                 }
2944         }
2945
2946
2947         /* There must be the same number of lmasters in the vnn map as
2948          * there are active nodes with the lmaster capability...  or
2949          * do a recovery.
2950          */
2951         if (vnnmap->size != num_lmasters) {
2952                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2953                           vnnmap->size, num_lmasters));
2954                 ctdb_set_culprit(rec, ctdb->pnn);
2955                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2956                 return;
2957         }
2958
2959         /* verify that all active nodes in the nodemap also exist in 
2960            the vnnmap.
2961          */
2962         for (j=0; j<nodemap->num; j++) {
2963                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2964                         continue;
2965                 }
2966                 if (nodemap->nodes[j].pnn == pnn) {
2967                         continue;
2968                 }
2969
2970                 for (i=0; i<vnnmap->size; i++) {
2971                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2972                                 break;
2973                         }
2974                 }
2975                 if (i == vnnmap->size) {
2976                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2977                                   nodemap->nodes[j].pnn));
2978                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2979                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2980                         return;
2981                 }
2982         }
2983
2984         
2985         /* verify that all other nodes have the same vnnmap
2986            and are from the same generation
2987          */
2988         for (j=0; j<nodemap->num; j++) {
2989                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2990                         continue;
2991                 }
2992                 if (nodemap->nodes[j].pnn == pnn) {
2993                         continue;
2994                 }
2995
2996                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2997                                           mem_ctx, &remote_vnnmap);
2998                 if (ret != 0) {
2999                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3000                                   nodemap->nodes[j].pnn));
3001                         return;
3002                 }
3003
3004                 /* verify the vnnmap generation is the same */
3005                 if (vnnmap->generation != remote_vnnmap->generation) {
3006                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3007                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3008                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3009                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3010                         return;
3011                 }
3012
3013                 /* verify the vnnmap size is the same */
3014                 if (vnnmap->size != remote_vnnmap->size) {
3015                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3016                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3017                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3018                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3019                         return;
3020                 }
3021
3022                 /* verify the vnnmap is the same */
3023                 for (i=0;i<vnnmap->size;i++) {
3024                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3025                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3026                                           nodemap->nodes[j].pnn));
3027                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3028                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3029                                             vnnmap);
3030                                 return;
3031                         }
3032                 }
3033         }
3034
3035         /* FIXME: Add remote public IP checking to ensure that nodes
3036          * have the IP addresses that are allocated to them. */
3037
3038 takeover_run_checks:
3039
3040         /* If there are IP takeover runs requested or the previous one
3041          * failed then perform one and notify the waiters */
3042         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3043             (rec->reallocate_requests || rec->need_takeover_run)) {
3044                 process_ipreallocate_requests(ctdb, rec);
3045         }
3046 }
3047
3048 static void recd_sig_term_handler(struct tevent_context *ev,
3049                                   struct tevent_signal *se, int signum,
3050                                   int count, void *dont_care,
3051                                   void *private_data)
3052 {
3053         struct ctdb_recoverd *rec = talloc_get_type_abort(
3054                 private_data, struct ctdb_recoverd);
3055
3056         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
3057         ctdb_recovery_unlock(rec);
3058         exit(0);
3059 }
3060
3061
3062 /*
3063   the main monitoring loop
3064  */
3065 static void monitor_cluster(struct ctdb_context *ctdb)
3066 {
3067         struct tevent_signal *se;
3068         struct ctdb_recoverd *rec;
3069
3070         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3071
3072         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3073         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3074
3075         rec->ctdb = ctdb;
3076         rec->recmaster = CTDB_UNKNOWN_PNN;
3077         rec->recovery_lock_handle = NULL;
3078
3079         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3080         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3081
3082         rec->recovery = ctdb_op_init(rec, "recoveries");
3083         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3084
3085         rec->priority_time = timeval_current();
3086         rec->frozen_on_inactive = false;
3087
3088         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
3089                                recd_sig_term_handler, rec);
3090         if (se == NULL) {
3091                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
3092                 exit(1);
3093         }
3094
3095         /* register a message port for sending memory dumps */
3096         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3097
3098         /* when a node is assigned banning credits */
3099         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3100                                         banning_handler, rec);
3101
3102         /* register a message port for recovery elections */
3103         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3104
3105         /* when nodes are disabled/enabled */
3106         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3107
3108         /* when we are asked to puch out a flag change */
3109         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3110
3111         /* register a message port for vacuum fetch */
3112         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3113
3114         /* register a message port for reloadnodes  */
3115         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3116
3117         /* register a message port for performing a takeover run */
3118         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3119
3120         /* register a message port for disabling the ip check for a short while */
3121         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3122
3123         /* register a message port for forcing a rebalance of a node next
3124            reallocation */
3125         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3126
3127         /* Register a message port for disabling takeover runs */
3128         ctdb_client_set_message_handler(ctdb,
3129                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3130                                         disable_takeover_runs_handler, rec);
3131
3132         /* Register a message port for disabling recoveries */
3133         ctdb_client_set_message_handler(ctdb,
3134                                         CTDB_SRVID_DISABLE_RECOVERIES,
3135                                         disable_recoveries_handler, rec);
3136
3137         /* register a message port for detaching database */
3138         ctdb_client_set_message_handler(ctdb,
3139                                         CTDB_SRVID_DETACH_DATABASE,
3140                                         detach_database_handler, rec);
3141
3142         for (;;) {
3143                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3144                 struct timeval start;
3145                 double elapsed;
3146
3147                 if (!mem_ctx) {
3148                         DEBUG(DEBUG_CRIT,(__location__
3149                                           " Failed to create temp context\n"));
3150                         exit(-1);
3151                 }
3152
3153                 start = timeval_current();
3154                 main_loop(ctdb, rec, mem_ctx);
3155                 talloc_free(mem_ctx);
3156
3157                 /* we only check for recovery once every second */
3158                 elapsed = timeval_elapsed(&start);
3159                 if (elapsed < ctdb->tunable.recover_interval) {
3160                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3161                                           - elapsed);
3162                 }
3163         }
3164 }
3165
3166 /*
3167   event handler for when the main ctdbd dies
3168  */
3169 static void ctdb_recoverd_parent(struct tevent_context *ev,
3170                                  struct tevent_fd *fde,
3171                                  uint16_t flags, void *private_data)
3172 {
3173         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3174         _exit(1);
3175 }
3176
3177 /*
3178   called regularly to verify that the recovery daemon is still running
3179  */
3180 static void ctdb_check_recd(struct tevent_context *ev,
3181                             struct tevent_timer *te,
3182                             struct timeval yt, void *p)
3183 {
3184         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3185
3186         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3187                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3188
3189                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3190                                  ctdb_restart_recd, ctdb);
3191
3192                 return;
3193         }
3194
3195         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3196                          timeval_current_ofs(30, 0),
3197                          ctdb_check_recd, ctdb);
3198 }
3199
3200 static void recd_sig_child_handler(struct tevent_context *ev,
3201                                    struct tevent_signal *se, int signum,
3202                                    int count, void *dont_care,
3203                                    void *private_data)
3204 {
3205 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3206         int status;
3207         pid_t pid = -1;
3208
3209         while (pid != 0) {
3210                 pid = waitpid(-1, &status, WNOHANG);
3211                 if (pid == -1) {
3212                         if (errno != ECHILD) {
3213                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3214                         }
3215                         return;
3216                 }
3217                 if (pid > 0) {
3218                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3219                 }
3220         }
3221 }
3222
3223 /*
3224   startup the recovery daemon as a child of the main ctdb daemon
3225  */
3226 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3227 {
3228         int fd[2];
3229         struct tevent_signal *se;
3230         struct tevent_fd *fde;
3231         int ret;
3232
3233         if (pipe(fd) != 0) {
3234                 return -1;
3235         }
3236
3237         ctdb->recoverd_pid = ctdb_fork(ctdb);
3238         if (ctdb->recoverd_pid == -1) {
3239                 return -1;
3240         }
3241
3242         if (ctdb->recoverd_pid != 0) {
3243                 talloc_free(ctdb->recd_ctx);
3244                 ctdb->recd_ctx = talloc_new(ctdb);
3245                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3246
3247                 close(fd[0]);
3248                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3249                                  timeval_current_ofs(30, 0),
3250                                  ctdb_check_recd, ctdb);
3251                 return 0;
3252         }
3253
3254         close(fd[1]);
3255
3256         srandom(getpid() ^ time(NULL));
3257
3258         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3259         if (ret != 0) {
3260                 return -1;
3261         }
3262
3263         prctl_set_comment("ctdb_recoverd");
3264         if (switch_from_server_to_client(ctdb) != 0) {
3265                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3266                 exit(1);
3267         }
3268
3269         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3270
3271         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3272                             ctdb_recoverd_parent, &fd[0]);
3273         tevent_fd_set_auto_close(fde);
3274
3275         /* set up a handler to pick up sigchld */
3276         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3277                                recd_sig_child_handler, ctdb);
3278         if (se == NULL) {
3279                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3280                 exit(1);
3281         }
3282
3283         monitor_cluster(ctdb);
3284
3285         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3286         return -1;
3287 }
3288
3289 /*
3290   shutdown the recovery daemon
3291  */
3292 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3293 {
3294         if (ctdb->recoverd_pid == 0) {
3295                 return;
3296         }
3297
3298         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3299         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3300
3301         TALLOC_FREE(ctdb->recd_ctx);
3302         TALLOC_FREE(ctdb->recd_ping_count);
3303 }
3304
3305 static void ctdb_restart_recd(struct tevent_context *ev,
3306                               struct tevent_timer *te,
3307                               struct timeval t, void *private_data)
3308 {
3309         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3310
3311         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3312         ctdb_stop_recoverd(ctdb);
3313         ctdb_start_recoverd(ctdb);
3314 }