aeb23276fe7d49fcb5e32a7e2ca3eec7d82dfe71
[amitay/samba.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36 #include "lib/util/util_process.h"
37
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/system_socket.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45 #include "server/ctdb_config.h"
46
47 #include "ctdb_cluster_mutex.h"
48
49 /* List of SRVID requests that need to be processed */
50 struct srvid_list {
51         struct srvid_list *next, *prev;
52         struct ctdb_srvid_message *request;
53 };
54
55 struct srvid_requests {
56         struct srvid_list *requests;
57 };
58
59 static void srvid_request_reply(struct ctdb_context *ctdb,
60                                 struct ctdb_srvid_message *request,
61                                 TDB_DATA result)
62 {
63         /* Someone that sent srvid==0 does not want a reply */
64         if (request->srvid == 0) {
65                 talloc_free(request);
66                 return;
67         }
68
69         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
70                                      result) == 0) {
71                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
72                                   (unsigned)request->pnn,
73                                   (unsigned long long)request->srvid));
74         } else {
75                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
76                                  (unsigned)request->pnn,
77                                  (unsigned long long)request->srvid));
78         }
79
80         talloc_free(request);
81 }
82
83 static void srvid_requests_reply(struct ctdb_context *ctdb,
84                                  struct srvid_requests **requests,
85                                  TDB_DATA result)
86 {
87         struct srvid_list *r;
88
89         if (*requests == NULL) {
90                 return;
91         }
92
93         for (r = (*requests)->requests; r != NULL; r = r->next) {
94                 srvid_request_reply(ctdb, r->request, result);
95         }
96
97         /* Free the list structure... */
98         TALLOC_FREE(*requests);
99 }
100
101 static void srvid_request_add(struct ctdb_context *ctdb,
102                               struct srvid_requests **requests,
103                               struct ctdb_srvid_message *request)
104 {
105         struct srvid_list *t;
106         int32_t ret;
107         TDB_DATA result;
108
109         if (*requests == NULL) {
110                 *requests = talloc_zero(ctdb, struct srvid_requests);
111                 if (*requests == NULL) {
112                         goto nomem;
113                 }
114         }
115
116         t = talloc_zero(*requests, struct srvid_list);
117         if (t == NULL) {
118                 /* If *requests was just allocated above then free it */
119                 if ((*requests)->requests == NULL) {
120                         TALLOC_FREE(*requests);
121                 }
122                 goto nomem;
123         }
124
125         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
126         DLIST_ADD((*requests)->requests, t);
127
128         return;
129
130 nomem:
131         /* Failed to add the request to the list.  Send a fail. */
132         DEBUG(DEBUG_ERR, (__location__
133                           " Out of memory, failed to queue SRVID request\n"));
134         ret = -ENOMEM;
135         result.dsize = sizeof(ret);
136         result.dptr = (uint8_t *)&ret;
137         srvid_request_reply(ctdb, request, result);
138 }
139
140 /* An abstraction to allow an operation (takeover runs, recoveries,
141  * ...) to be disabled for a given timeout */
142 struct ctdb_op_state {
143         struct tevent_timer *timer;
144         bool in_progress;
145         const char *name;
146 };
147
148 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
149 {
150         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
151
152         if (state != NULL) {
153                 state->in_progress = false;
154                 state->name = name;
155         }
156
157         return state;
158 }
159
160 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
161 {
162         return state->timer != NULL;
163 }
164
165 static bool ctdb_op_begin(struct ctdb_op_state *state)
166 {
167         if (ctdb_op_is_disabled(state)) {
168                 DEBUG(DEBUG_NOTICE,
169                       ("Unable to begin - %s are disabled\n", state->name));
170                 return false;
171         }
172
173         state->in_progress = true;
174         return true;
175 }
176
177 static bool ctdb_op_end(struct ctdb_op_state *state)
178 {
179         return state->in_progress = false;
180 }
181
182 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
183 {
184         return state->in_progress;
185 }
186
187 static void ctdb_op_enable(struct ctdb_op_state *state)
188 {
189         TALLOC_FREE(state->timer);
190 }
191
192 static void ctdb_op_timeout_handler(struct tevent_context *ev,
193                                     struct tevent_timer *te,
194                                     struct timeval yt, void *p)
195 {
196         struct ctdb_op_state *state =
197                 talloc_get_type(p, struct ctdb_op_state);
198
199         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
200         ctdb_op_enable(state);
201 }
202
203 static int ctdb_op_disable(struct ctdb_op_state *state,
204                            struct tevent_context *ev,
205                            uint32_t timeout)
206 {
207         if (timeout == 0) {
208                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
209                 ctdb_op_enable(state);
210                 return 0;
211         }
212
213         if (state->in_progress) {
214                 DEBUG(DEBUG_ERR,
215                       ("Unable to disable %s - in progress\n", state->name));
216                 return -EAGAIN;
217         }
218
219         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
220                             state->name, timeout));
221
222         /* Clear any old timers */
223         talloc_free(state->timer);
224
225         /* Arrange for the timeout to occur */
226         state->timer = tevent_add_timer(ev, state,
227                                         timeval_current_ofs(timeout, 0),
228                                         ctdb_op_timeout_handler, state);
229         if (state->timer == NULL) {
230                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
231                 return -ENOMEM;
232         }
233
234         return 0;
235 }
236
237 struct ctdb_banning_state {
238         uint32_t count;
239         struct timeval last_reported_time;
240 };
241
242 struct ctdb_recovery_lock_handle;
243
244 /*
245   private state of recovery daemon
246  */
247 struct ctdb_recoverd {
248         struct ctdb_context *ctdb;
249         uint32_t recmaster;
250         uint32_t last_culprit_node;
251         struct ctdb_node_map_old *nodemap;
252         struct timeval priority_time;
253         bool need_takeover_run;
254         bool need_recovery;
255         uint32_t node_flags;
256         struct tevent_timer *send_election_te;
257         struct tevent_timer *election_timeout;
258         struct srvid_requests *reallocate_requests;
259         struct ctdb_op_state *takeover_run;
260         struct ctdb_op_state *recovery;
261         struct ctdb_iface_list_old *ifaces;
262         uint32_t *force_rebalance_nodes;
263         struct ctdb_node_capabilities *caps;
264         bool frozen_on_inactive;
265         struct ctdb_recovery_lock_handle *recovery_lock_handle;
266 };
267
268 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
269 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
270
271 static void ctdb_restart_recd(struct tevent_context *ev,
272                               struct tevent_timer *te, struct timeval t,
273                               void *private_data);
274
275 /*
276   ban a node for a period of time
277  */
278 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
279 {
280         int ret;
281         struct ctdb_context *ctdb = rec->ctdb;
282         struct ctdb_ban_state bantime;
283
284         if (!ctdb_validate_pnn(ctdb, pnn)) {
285                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
286                 return;
287         }
288
289         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
290
291         bantime.pnn  = pnn;
292         bantime.time = ban_time;
293
294         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
295         if (ret != 0) {
296                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
297                 return;
298         }
299
300 }
301
302 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
303
304
305 /*
306   remember the trouble maker
307  */
308 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
309 {
310         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
311         struct ctdb_banning_state *ban_state;
312
313         if (culprit > ctdb->num_nodes) {
314                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
315                 return;
316         }
317
318         /* If we are banned or stopped, do not set other nodes as culprits */
319         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
320                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
321                 return;
322         }
323
324         if (ctdb->nodes[culprit]->ban_state == NULL) {
325                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
326                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
327
328                 
329         }
330         ban_state = ctdb->nodes[culprit]->ban_state;
331         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
332                 /* this was the first time in a long while this node
333                    misbehaved so we will forgive any old transgressions.
334                 */
335                 ban_state->count = 0;
336         }
337
338         ban_state->count += count;
339         ban_state->last_reported_time = timeval_current();
340         rec->last_culprit_node = culprit;
341 }
342
343 /*
344   remember the trouble maker
345  */
346 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
347 {
348         ctdb_set_culprit_count(rec, culprit, 1);
349 }
350
351 /*
352   Retrieve capabilities from all connected nodes
353  */
354 static int update_capabilities(struct ctdb_recoverd *rec,
355                                struct ctdb_node_map_old *nodemap)
356 {
357         uint32_t *capp;
358         TALLOC_CTX *tmp_ctx;
359         struct ctdb_node_capabilities *caps;
360         struct ctdb_context *ctdb = rec->ctdb;
361
362         tmp_ctx = talloc_new(rec);
363         CTDB_NO_MEMORY(ctdb, tmp_ctx);
364
365         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
366                                      CONTROL_TIMEOUT(), nodemap);
367
368         if (caps == NULL) {
369                 DEBUG(DEBUG_ERR,
370                       (__location__ " Failed to get node capabilities\n"));
371                 talloc_free(tmp_ctx);
372                 return -1;
373         }
374
375         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
376         if (capp == NULL) {
377                 DEBUG(DEBUG_ERR,
378                       (__location__
379                        " Capabilities don't include current node.\n"));
380                 talloc_free(tmp_ctx);
381                 return -1;
382         }
383         ctdb->capabilities = *capp;
384
385         TALLOC_FREE(rec->caps);
386         rec->caps = talloc_steal(rec, caps);
387
388         talloc_free(tmp_ctx);
389         return 0;
390 }
391
392 /*
393   change recovery mode on all nodes
394  */
395 static int set_recovery_mode(struct ctdb_context *ctdb,
396                              struct ctdb_recoverd *rec,
397                              struct ctdb_node_map_old *nodemap,
398                              uint32_t rec_mode)
399 {
400         TDB_DATA data;
401         uint32_t *nodes;
402         TALLOC_CTX *tmp_ctx;
403
404         tmp_ctx = talloc_new(ctdb);
405         CTDB_NO_MEMORY(ctdb, tmp_ctx);
406
407         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
408
409         data.dsize = sizeof(uint32_t);
410         data.dptr = (unsigned char *)&rec_mode;
411
412         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
413                                         nodes, 0,
414                                         CONTROL_TIMEOUT(),
415                                         false, data,
416                                         NULL, NULL,
417                                         NULL) != 0) {
418                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
419                 talloc_free(tmp_ctx);
420                 return -1;
421         }
422
423         talloc_free(tmp_ctx);
424         return 0;
425 }
426
427 /*
428  * Update flags on all connected nodes
429  */
430 static int update_flags_on_all_nodes(struct ctdb_recoverd *rec,
431                                      uint32_t pnn,
432                                      uint32_t flags)
433 {
434         struct ctdb_context *ctdb = rec->ctdb;
435         struct timeval timeout = CONTROL_TIMEOUT();
436         TDB_DATA data;
437         struct ctdb_node_map_old *nodemap=NULL;
438         struct ctdb_node_flag_change c;
439         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
440         uint32_t *nodes;
441         uint32_t i;
442         int ret;
443
444         nodemap = rec->nodemap;
445
446         for (i = 0; i < nodemap->num; i++) {
447                 if (pnn == nodemap->nodes[i].pnn) {
448                         break;
449                 }
450         }
451         if (i >= nodemap->num) {
452                 DBG_ERR("Nodemap does not contain node %d\n", pnn);
453                 talloc_free(tmp_ctx);
454                 return -1;
455         }
456
457         c.pnn       = pnn;
458         c.old_flags = nodemap->nodes[i].flags;
459         c.new_flags = flags;
460
461         data.dsize = sizeof(c);
462         data.dptr = (unsigned char *)&c;
463
464         /* send the flags update to all connected nodes */
465         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
466
467         ret = ctdb_client_async_control(ctdb,
468                                         CTDB_CONTROL_MODIFY_FLAGS,
469                                         nodes,
470                                         0,
471                                         timeout,
472                                         false,
473                                         data,
474                                         NULL,
475                                         NULL,
476                                         NULL);
477         if (ret != 0) {
478                 DBG_ERR("Unable to update flags on remote nodes\n");
479                 talloc_free(tmp_ctx);
480                 return -1;
481         }
482
483         talloc_free(tmp_ctx);
484         return 0;
485 }
486
487 /*
488   called when ctdb_wait_timeout should finish
489  */
490 static void ctdb_wait_handler(struct tevent_context *ev,
491                               struct tevent_timer *te,
492                               struct timeval yt, void *p)
493 {
494         uint32_t *timed_out = (uint32_t *)p;
495         (*timed_out) = 1;
496 }
497
498 /*
499   wait for a given number of seconds
500  */
501 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
502 {
503         uint32_t timed_out = 0;
504         time_t usecs = (secs - (time_t)secs) * 1000000;
505         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
506                          ctdb_wait_handler, &timed_out);
507         while (!timed_out) {
508                 tevent_loop_once(ctdb->ev);
509         }
510 }
511
512 /*
513   called when an election times out (ends)
514  */
515 static void ctdb_election_timeout(struct tevent_context *ev,
516                                   struct tevent_timer *te,
517                                   struct timeval t, void *p)
518 {
519         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
520         rec->election_timeout = NULL;
521         fast_start = false;
522
523         D_WARNING("Election period ended, master=%u\n", rec->recmaster);
524 }
525
526
527 /*
528   wait for an election to finish. It finished election_timeout seconds after
529   the last election packet is received
530  */
531 static void ctdb_wait_election(struct ctdb_recoverd *rec)
532 {
533         struct ctdb_context *ctdb = rec->ctdb;
534         while (rec->election_timeout) {
535                 tevent_loop_once(ctdb->ev);
536         }
537 }
538
539 /*
540  * Update local flags from all remote connected nodes and push out
541  * flags changes to all nodes.  This is only run by the recovery
542  * master.
543  */
544 static int update_flags(struct ctdb_recoverd *rec,
545                         struct ctdb_node_map_old *nodemap,
546                         struct ctdb_node_map_old **remote_nodemaps)
547 {
548         unsigned int j;
549         struct ctdb_context *ctdb = rec->ctdb;
550         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
551
552         /* Check flags from remote nodes */
553         for (j=0; j<nodemap->num; j++) {
554                 struct ctdb_node_map_old *remote_nodemap=NULL;
555                 uint32_t local_flags = nodemap->nodes[j].flags;
556                 uint32_t remote_flags;
557                 int ret;
558
559                 if (local_flags & NODE_FLAGS_DISCONNECTED) {
560                         continue;
561                 }
562                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
563                         continue;
564                 }
565
566                 remote_nodemap = remote_nodemaps[j];
567                 remote_flags = remote_nodemap->nodes[j].flags;
568
569                 if (local_flags != remote_flags) {
570                         ret = update_flags_on_all_nodes(rec,
571                                                         nodemap->nodes[j].pnn,
572                                                         remote_flags);
573                         if (ret != 0) {
574                                 DBG_ERR(
575                                     "Unable to update flags on remote nodes\n");
576                                 talloc_free(mem_ctx);
577                                 return -1;
578                         }
579
580                         /*
581                          * Update the local copy of the flags in the
582                          * recovery daemon.
583                          */
584                         D_NOTICE("Remote node %u had flags 0x%x, "
585                                  "local had 0x%x - updating local\n",
586                                  nodemap->nodes[j].pnn,
587                                  remote_flags,
588                                  local_flags);
589                         nodemap->nodes[j].flags = remote_flags;
590                 }
591         }
592         talloc_free(mem_ctx);
593         return 0;
594 }
595
596
597 /* Create a new random generation id.
598    The generation id can not be the INVALID_GENERATION id
599 */
600 static uint32_t new_generation(void)
601 {
602         uint32_t generation;
603
604         while (1) {
605                 generation = random();
606
607                 if (generation != INVALID_GENERATION) {
608                         break;
609                 }
610         }
611
612         return generation;
613 }
614
615 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
616 {
617         return (rec->recovery_lock_handle != NULL);
618 }
619
620 struct ctdb_recovery_lock_handle {
621         bool done;
622         bool locked;
623         double latency;
624         struct ctdb_cluster_mutex_handle *h;
625         struct ctdb_recoverd *rec;
626 };
627
628 static void take_reclock_handler(char status,
629                                  double latency,
630                                  void *private_data)
631 {
632         struct ctdb_recovery_lock_handle *s =
633                 (struct ctdb_recovery_lock_handle *) private_data;
634
635         s->locked = (status == '0') ;
636
637         /*
638          * If unsuccessful then ensure the process has exited and that
639          * the file descriptor event handler has been cancelled
640          */
641         if (! s->locked) {
642                 TALLOC_FREE(s->h);
643         }
644
645         switch (status) {
646         case '0':
647                 s->latency = latency;
648                 break;
649
650         case '1':
651                 D_ERR("Unable to take recovery lock - contention\n");
652                 break;
653
654         case '2':
655                 D_ERR("Unable to take recovery lock - timeout\n");
656                 break;
657
658         default:
659                 D_ERR("Unable to take recover lock - unknown error\n");
660
661                 {
662                         struct ctdb_recoverd *rec = s->rec;
663                         struct ctdb_context *ctdb = rec->ctdb;
664                         uint32_t pnn = ctdb_get_pnn(ctdb);
665
666                         D_ERR("Banning this node\n");
667                         ctdb_ban_node(rec,
668                                       pnn,
669                                       ctdb->tunable.recovery_ban_period);
670                 }
671         }
672
673         s->done = true;
674 }
675
676 static void force_election(struct ctdb_recoverd *rec,
677                            uint32_t pnn,
678                            struct ctdb_node_map_old *nodemap);
679
680 static void lost_reclock_handler(void *private_data)
681 {
682         struct ctdb_recoverd *rec = talloc_get_type_abort(
683                 private_data, struct ctdb_recoverd);
684
685         D_ERR("Recovery lock helper terminated, triggering an election\n");
686         TALLOC_FREE(rec->recovery_lock_handle);
687
688         force_election(rec, ctdb_get_pnn(rec->ctdb), rec->nodemap);
689 }
690
691 static bool ctdb_recovery_lock(struct ctdb_recoverd *rec)
692 {
693         struct ctdb_context *ctdb = rec->ctdb;
694         struct ctdb_cluster_mutex_handle *h;
695         struct ctdb_recovery_lock_handle *s;
696
697         s = talloc_zero(rec, struct ctdb_recovery_lock_handle);
698         if (s == NULL) {
699                 DBG_ERR("Memory allocation error\n");
700                 return false;
701         };
702
703         s->rec = rec;
704
705         h = ctdb_cluster_mutex(s,
706                                ctdb,
707                                ctdb->recovery_lock,
708                                120,
709                                take_reclock_handler,
710                                s,
711                                lost_reclock_handler,
712                                rec);
713         if (h == NULL) {
714                 talloc_free(s);
715                 return false;
716         }
717
718         rec->recovery_lock_handle = s;
719         s->h = h;
720
721         while (! s->done) {
722                 tevent_loop_once(ctdb->ev);
723         }
724
725         if (! s->locked) {
726                 TALLOC_FREE(rec->recovery_lock_handle);
727                 return false;
728         }
729
730         ctdb_ctrl_report_recd_lock_latency(ctdb,
731                                            CONTROL_TIMEOUT(),
732                                            s->latency);
733
734         return true;
735 }
736
737 static void ctdb_recovery_unlock(struct ctdb_recoverd *rec)
738 {
739         if (rec->recovery_lock_handle == NULL) {
740                 return;
741         }
742
743         if (! rec->recovery_lock_handle->done) {
744                 /*
745                  * Taking of recovery lock still in progress.  Free
746                  * the cluster mutex handle to release it but leave
747                  * the recovery lock handle in place to allow taking
748                  * of the lock to fail.
749                  */
750                 D_NOTICE("Cancelling recovery lock\n");
751                 TALLOC_FREE(rec->recovery_lock_handle->h);
752                 rec->recovery_lock_handle->done = true;
753                 rec->recovery_lock_handle->locked = false;
754                 return;
755         }
756
757         D_NOTICE("Releasing recovery lock\n");
758         TALLOC_FREE(rec->recovery_lock_handle);
759 }
760
761 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
762 {
763         struct ctdb_context *ctdb = rec->ctdb;
764         unsigned int i;
765         struct ctdb_banning_state *ban_state;
766
767         *self_ban = false;
768         for (i=0; i<ctdb->num_nodes; i++) {
769                 if (ctdb->nodes[i]->ban_state == NULL) {
770                         continue;
771                 }
772                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
773                 if (ban_state->count < 2*ctdb->num_nodes) {
774                         continue;
775                 }
776
777                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
778                         ctdb->nodes[i]->pnn, ban_state->count,
779                         ctdb->tunable.recovery_ban_period));
780                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
781                 ban_state->count = 0;
782
783                 /* Banning ourself? */
784                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
785                         *self_ban = true;
786                 }
787         }
788 }
789
790 struct helper_state {
791         int fd[2];
792         pid_t pid;
793         int result;
794         bool done;
795 };
796
797 static void helper_handler(struct tevent_context *ev,
798                            struct tevent_fd *fde,
799                            uint16_t flags, void *private_data)
800 {
801         struct helper_state *state = talloc_get_type_abort(
802                 private_data, struct helper_state);
803         int ret;
804
805         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
806         if (ret != sizeof(state->result)) {
807                 state->result = EPIPE;
808         }
809
810         state->done = true;
811 }
812
813 static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
814                       const char *prog, const char *arg, const char *type)
815 {
816         struct helper_state *state;
817         struct tevent_fd *fde;
818         const char **args;
819         int nargs, ret;
820         uint32_t recmaster = rec->recmaster;
821
822         state = talloc_zero(mem_ctx, struct helper_state);
823         if (state == NULL) {
824                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
825                 return -1;
826         }
827
828         state->pid = -1;
829
830         ret = pipe(state->fd);
831         if (ret != 0) {
832                 DEBUG(DEBUG_ERR,
833                       ("Failed to create pipe for %s helper\n", type));
834                 goto fail;
835         }
836
837         set_close_on_exec(state->fd[0]);
838
839         nargs = 4;
840         args = talloc_array(state, const char *, nargs);
841         if (args == NULL) {
842                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
843                 goto fail;
844         }
845
846         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
847         if (args[0] == NULL) {
848                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
849                 goto fail;
850         }
851         args[1] = rec->ctdb->daemon.name;
852         args[2] = arg;
853         args[3] = NULL;
854
855         if (args[2] == NULL) {
856                 nargs = 3;
857         }
858
859         state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
860         if (state->pid == -1) {
861                 DEBUG(DEBUG_ERR,
862                       ("Failed to create child for %s helper\n", type));
863                 goto fail;
864         }
865
866         close(state->fd[1]);
867         state->fd[1] = -1;
868
869         state->done = false;
870
871         fde = tevent_add_fd(rec->ctdb->ev, state, state->fd[0],
872                             TEVENT_FD_READ, helper_handler, state);
873         if (fde == NULL) {
874                 goto fail;
875         }
876         tevent_fd_set_auto_close(fde);
877
878         while (!state->done) {
879                 tevent_loop_once(rec->ctdb->ev);
880
881                 /* If recmaster changes, we have lost election */
882                 if (recmaster != rec->recmaster) {
883                         D_ERR("Recmaster changed to %u, aborting %s\n",
884                               rec->recmaster, type);
885                         state->result = 1;
886                         break;
887                 }
888         }
889
890         close(state->fd[0]);
891         state->fd[0] = -1;
892
893         if (state->result != 0) {
894                 goto fail;
895         }
896
897         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
898         talloc_free(state);
899         return 0;
900
901 fail:
902         if (state->fd[0] != -1) {
903                 close(state->fd[0]);
904         }
905         if (state->fd[1] != -1) {
906                 close(state->fd[1]);
907         }
908         if (state->pid != -1) {
909                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
910         }
911         talloc_free(state);
912         return -1;
913 }
914
915
916 static int ctdb_takeover(struct ctdb_recoverd *rec,
917                          uint32_t *force_rebalance_nodes)
918 {
919         static char prog[PATH_MAX+1] = "";
920         char *arg;
921         unsigned int i;
922         int ret;
923
924         if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
925                              "CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
926                              "ctdb_takeover_helper")) {
927                 ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
928         }
929
930         arg = NULL;
931         for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
932                 uint32_t pnn = force_rebalance_nodes[i];
933                 if (arg == NULL) {
934                         arg = talloc_asprintf(rec, "%u", pnn);
935                 } else {
936                         arg = talloc_asprintf_append(arg, ",%u", pnn);
937                 }
938                 if (arg == NULL) {
939                         DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
940                         return -1;
941                 }
942         }
943
944         if (ctdb_config.failover_disabled) {
945                 ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
946                 if (ret != 0) {
947                         D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
948                         return -1;
949                 }
950         }
951
952         return helper_run(rec, rec, prog, arg, "takeover");
953 }
954
955 static bool do_takeover_run(struct ctdb_recoverd *rec,
956                             struct ctdb_node_map_old *nodemap)
957 {
958         uint32_t *nodes = NULL;
959         struct ctdb_disable_message dtr;
960         TDB_DATA data;
961         size_t i;
962         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
963         int ret;
964         bool ok;
965
966         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
967
968         if (ctdb_op_is_in_progress(rec->takeover_run)) {
969                 DEBUG(DEBUG_ERR, (__location__
970                                   " takeover run already in progress \n"));
971                 ok = false;
972                 goto done;
973         }
974
975         if (!ctdb_op_begin(rec->takeover_run)) {
976                 ok = false;
977                 goto done;
978         }
979
980         /* Disable IP checks (takeover runs, really) on other nodes
981          * while doing this takeover run.  This will stop those other
982          * nodes from triggering takeover runs when think they should
983          * be hosting an IP but it isn't yet on an interface.  Don't
984          * wait for replies since a failure here might cause some
985          * noise in the logs but will not actually cause a problem.
986          */
987         ZERO_STRUCT(dtr);
988         dtr.srvid = 0; /* No reply */
989         dtr.pnn = -1;
990
991         data.dptr  = (uint8_t*)&dtr;
992         data.dsize = sizeof(dtr);
993
994         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
995
996         /* Disable for 60 seconds.  This can be a tunable later if
997          * necessary.
998          */
999         dtr.timeout = 60;
1000         for (i = 0; i < talloc_array_length(nodes); i++) {
1001                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1002                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1003                                              data) != 0) {
1004                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1005                 }
1006         }
1007
1008         ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
1009
1010         /* Reenable takeover runs and IP checks on other nodes */
1011         dtr.timeout = 0;
1012         for (i = 0; i < talloc_array_length(nodes); i++) {
1013                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1014                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1015                                              data) != 0) {
1016                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1017                 }
1018         }
1019
1020         if (ret != 0) {
1021                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1022                 ok = false;
1023                 goto done;
1024         }
1025
1026         ok = true;
1027         /* Takeover run was successful so clear force rebalance targets */
1028         if (rebalance_nodes == rec->force_rebalance_nodes) {
1029                 TALLOC_FREE(rec->force_rebalance_nodes);
1030         } else {
1031                 DEBUG(DEBUG_WARNING,
1032                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1033         }
1034 done:
1035         rec->need_takeover_run = !ok;
1036         talloc_free(nodes);
1037         ctdb_op_end(rec->takeover_run);
1038
1039         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1040         return ok;
1041 }
1042
1043 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1044 {
1045         static char prog[PATH_MAX+1] = "";
1046         const char *arg;
1047
1048         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1049                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1050                              "ctdb_recovery_helper")) {
1051                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1052         }
1053
1054         arg = talloc_asprintf(mem_ctx, "%u", new_generation());
1055         if (arg == NULL) {
1056                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1057                 return -1;
1058         }
1059
1060         setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
1061
1062         return helper_run(rec, mem_ctx, prog, arg, "recovery");
1063 }
1064
1065 /*
1066   we are the recmaster, and recovery is needed - start a recovery run
1067  */
1068 static int do_recovery(struct ctdb_recoverd *rec,
1069                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1070                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1071 {
1072         struct ctdb_context *ctdb = rec->ctdb;
1073         unsigned int i;
1074         int ret;
1075         bool self_ban;
1076
1077         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1078
1079         /* Check if the current node is still the recmaster.  It's possible that
1080          * re-election has changed the recmaster.
1081          */
1082         if (pnn != rec->recmaster) {
1083                 DEBUG(DEBUG_NOTICE,
1084                       ("Recovery master changed to %u, aborting recovery\n",
1085                        rec->recmaster));
1086                 return -1;
1087         }
1088
1089         /* if recovery fails, force it again */
1090         rec->need_recovery = true;
1091
1092         if (!ctdb_op_begin(rec->recovery)) {
1093                 return -1;
1094         }
1095
1096         if (rec->election_timeout) {
1097                 /* an election is in progress */
1098                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1099                 goto fail;
1100         }
1101
1102         ban_misbehaving_nodes(rec, &self_ban);
1103         if (self_ban) {
1104                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1105                 goto fail;
1106         }
1107
1108         if (ctdb->recovery_lock != NULL) {
1109                 if (ctdb_recovery_have_lock(rec)) {
1110                         D_NOTICE("Already holding recovery lock\n");
1111                 } else {
1112                         bool ok;
1113
1114                         D_NOTICE("Attempting to take recovery lock (%s)\n",
1115                                  ctdb->recovery_lock);
1116
1117                         ok = ctdb_recovery_lock(rec);
1118                         if (! ok) {
1119                                 D_ERR("Unable to take recovery lock\n");
1120
1121                                 if (pnn != rec->recmaster) {
1122                                         D_NOTICE("Recovery master changed to %u,"
1123                                                  " aborting recovery\n",
1124                                                  rec->recmaster);
1125                                         rec->need_recovery = false;
1126                                         goto fail;
1127                                 }
1128
1129                                 if (ctdb->runstate ==
1130                                     CTDB_RUNSTATE_FIRST_RECOVERY) {
1131                                         /*
1132                                          * First recovery?  Perhaps
1133                                          * current node does not yet
1134                                          * know who the recmaster is.
1135                                          */
1136                                         D_ERR("Retrying recovery\n");
1137                                         goto fail;
1138                                 }
1139
1140                                 D_ERR("Abort recovery, "
1141                                       "ban this node for %u seconds\n",
1142                                       ctdb->tunable.recovery_ban_period);
1143                                 ctdb_ban_node(rec,
1144                                               pnn,
1145                                               ctdb->tunable.recovery_ban_period);
1146                                 goto fail;
1147                         }
1148                         D_NOTICE("Recovery lock taken successfully\n");
1149                 }
1150         }
1151
1152         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1153
1154         /* Retrieve capabilities from all connected nodes */
1155         ret = update_capabilities(rec, nodemap);
1156         if (ret!=0) {
1157                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1158                 return -1;
1159         }
1160
1161         /*
1162           update all nodes to have the same flags that we have
1163          */
1164         for (i=0;i<nodemap->num;i++) {
1165                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1166                         continue;
1167                 }
1168
1169                 ret = update_flags_on_all_nodes(rec,
1170                                                 nodemap->nodes[i].pnn,
1171                                                 nodemap->nodes[i].flags);
1172                 if (ret != 0) {
1173                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1174                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1175                         } else {
1176                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1177                                 return -1;
1178                         }
1179                 }
1180         }
1181
1182         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1183
1184         ret = db_recovery_parallel(rec, mem_ctx);
1185         if (ret != 0) {
1186                 goto fail;
1187         }
1188
1189         do_takeover_run(rec, nodemap);
1190
1191         /* send a message to all clients telling them that the cluster 
1192            has been reconfigured */
1193         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
1194                                        CTDB_SRVID_RECONFIGURE, tdb_null);
1195         if (ret != 0) {
1196                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
1197                 goto fail;
1198         }
1199
1200         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1201
1202         rec->need_recovery = false;
1203         ctdb_op_end(rec->recovery);
1204
1205         /* we managed to complete a full recovery, make sure to forgive
1206            any past sins by the nodes that could now participate in the
1207            recovery.
1208         */
1209         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1210         for (i=0;i<nodemap->num;i++) {
1211                 struct ctdb_banning_state *ban_state;
1212
1213                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1214                         continue;
1215                 }
1216
1217                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1218                 if (ban_state == NULL) {
1219                         continue;
1220                 }
1221
1222                 ban_state->count = 0;
1223         }
1224
1225         /* We just finished a recovery successfully.
1226            We now wait for rerecovery_timeout before we allow
1227            another recovery to take place.
1228         */
1229         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be suppressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1230         ctdb_op_disable(rec->recovery, ctdb->ev,
1231                         ctdb->tunable.rerecovery_timeout);
1232         return 0;
1233
1234 fail:
1235         ctdb_op_end(rec->recovery);
1236         return -1;
1237 }
1238
1239
1240 /*
1241   elections are won by first checking the number of connected nodes, then
1242   the priority time, then the pnn
1243  */
1244 struct election_message {
1245         uint32_t num_connected;
1246         struct timeval priority_time;
1247         uint32_t pnn;
1248         uint32_t node_flags;
1249 };
1250
1251 /*
1252   form this nodes election data
1253  */
1254 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1255 {
1256         unsigned int i;
1257         int ret;
1258         struct ctdb_node_map_old *nodemap;
1259         struct ctdb_context *ctdb = rec->ctdb;
1260
1261         ZERO_STRUCTP(em);
1262
1263         em->pnn = rec->ctdb->pnn;
1264         em->priority_time = rec->priority_time;
1265
1266         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1267         if (ret != 0) {
1268                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
1269                 return;
1270         }
1271
1272         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1273         em->node_flags = rec->node_flags;
1274
1275         for (i=0;i<nodemap->num;i++) {
1276                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1277                         em->num_connected++;
1278                 }
1279         }
1280
1281         /* we shouldnt try to win this election if we cant be a recmaster */
1282         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1283                 em->num_connected = 0;
1284                 em->priority_time = timeval_current();
1285         }
1286
1287         talloc_free(nodemap);
1288 }
1289
1290 /*
1291   see if the given election data wins
1292  */
1293 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1294 {
1295         struct election_message myem;
1296         int cmp = 0;
1297
1298         ctdb_election_data(rec, &myem);
1299
1300         /* we cant win if we don't have the recmaster capability */
1301         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1302                 return false;
1303         }
1304
1305         /* we cant win if we are banned */
1306         if (rec->node_flags & NODE_FLAGS_BANNED) {
1307                 return false;
1308         }
1309
1310         /* we cant win if we are stopped */
1311         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1312                 return false;
1313         }
1314
1315         /* we will automatically win if the other node is banned */
1316         if (em->node_flags & NODE_FLAGS_BANNED) {
1317                 return true;
1318         }
1319
1320         /* we will automatically win if the other node is banned */
1321         if (em->node_flags & NODE_FLAGS_STOPPED) {
1322                 return true;
1323         }
1324
1325         /* then the longest running node */
1326         if (cmp == 0) {
1327                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1328         }
1329
1330         if (cmp == 0) {
1331                 cmp = (int)myem.pnn - (int)em->pnn;
1332         }
1333
1334         return cmp > 0;
1335 }
1336
1337 /*
1338   send out an election request
1339  */
1340 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1341 {
1342         int ret;
1343         TDB_DATA election_data;
1344         struct election_message emsg;
1345         uint64_t srvid;
1346         struct ctdb_context *ctdb = rec->ctdb;
1347
1348         srvid = CTDB_SRVID_ELECTION;
1349
1350         ctdb_election_data(rec, &emsg);
1351
1352         election_data.dsize = sizeof(struct election_message);
1353         election_data.dptr  = (unsigned char *)&emsg;
1354
1355
1356         /* first we assume we will win the election and set 
1357            recoverymaster to be ourself on the current node
1358          */
1359         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1360                                      CTDB_CURRENT_NODE, pnn);
1361         if (ret != 0) {
1362                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
1363                 return -1;
1364         }
1365         rec->recmaster = pnn;
1366
1367         /* send an election message to all active nodes */
1368         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1369         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1370 }
1371
1372 /*
1373   we think we are winning the election - send a broadcast election request
1374  */
1375 static void election_send_request(struct tevent_context *ev,
1376                                   struct tevent_timer *te,
1377                                   struct timeval t, void *p)
1378 {
1379         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1380         int ret;
1381
1382         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1383         if (ret != 0) {
1384                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1385         }
1386
1387         TALLOC_FREE(rec->send_election_te);
1388 }
1389
1390 /*
1391   handler for memory dumps
1392 */
1393 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1394 {
1395         struct ctdb_recoverd *rec = talloc_get_type(
1396                 private_data, struct ctdb_recoverd);
1397         struct ctdb_context *ctdb = rec->ctdb;
1398         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1399         TDB_DATA *dump;
1400         int ret;
1401         struct ctdb_srvid_message *rd;
1402
1403         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1404                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1405                 talloc_free(tmp_ctx);
1406                 return;
1407         }
1408         rd = (struct ctdb_srvid_message *)data.dptr;
1409
1410         dump = talloc_zero(tmp_ctx, TDB_DATA);
1411         if (dump == NULL) {
1412                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1413                 talloc_free(tmp_ctx);
1414                 return;
1415         }
1416         ret = ctdb_dump_memory(ctdb, dump);
1417         if (ret != 0) {
1418                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1419                 talloc_free(tmp_ctx);
1420                 return;
1421         }
1422
1423 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1424
1425         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1426         if (ret != 0) {
1427                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1428                 talloc_free(tmp_ctx);
1429                 return;
1430         }
1431
1432         talloc_free(tmp_ctx);
1433 }
1434
1435 /*
1436   handler for reload_nodes
1437 */
1438 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
1439                                  void *private_data)
1440 {
1441         struct ctdb_recoverd *rec = talloc_get_type(
1442                 private_data, struct ctdb_recoverd);
1443
1444         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1445
1446         ctdb_load_nodes_file(rec->ctdb);
1447 }
1448
1449
1450 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
1451                                         void *private_data)
1452 {
1453         struct ctdb_recoverd *rec = talloc_get_type(
1454                 private_data, struct ctdb_recoverd);
1455         struct ctdb_context *ctdb = rec->ctdb;
1456         uint32_t pnn;
1457         uint32_t *t;
1458         int len;
1459
1460         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
1461                 return;
1462         }
1463
1464         if (data.dsize != sizeof(uint32_t)) {
1465                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1466                 return;
1467         }
1468
1469         pnn = *(uint32_t *)&data.dptr[0];
1470
1471         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
1472
1473         /* Copy any existing list of nodes.  There's probably some
1474          * sort of realloc variant that will do this but we need to
1475          * make sure that freeing the old array also cancels the timer
1476          * event for the timeout... not sure if realloc will do that.
1477          */
1478         len = (rec->force_rebalance_nodes != NULL) ?
1479                 talloc_array_length(rec->force_rebalance_nodes) :
1480                 0;
1481
1482         /* This allows duplicates to be added but they don't cause
1483          * harm.  A call to add a duplicate PNN arguably means that
1484          * the timeout should be reset, so this is the simplest
1485          * solution.
1486          */
1487         t = talloc_zero_array(rec, uint32_t, len+1);
1488         CTDB_NO_MEMORY_VOID(ctdb, t);
1489         if (len > 0) {
1490                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
1491         }
1492         t[len] = pnn;
1493
1494         talloc_free(rec->force_rebalance_nodes);
1495
1496         rec->force_rebalance_nodes = t;
1497 }
1498
1499
1500
1501 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
1502                                     TDB_DATA data,
1503                                     struct ctdb_op_state *op_state)
1504 {
1505         struct ctdb_disable_message *r;
1506         uint32_t timeout;
1507         TDB_DATA result;
1508         int32_t ret = 0;
1509
1510         /* Validate input data */
1511         if (data.dsize != sizeof(struct ctdb_disable_message)) {
1512                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1513                                  "expecting %lu\n", (long unsigned)data.dsize,
1514                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
1515                 return;
1516         }
1517         if (data.dptr == NULL) {
1518                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1519                 return;
1520         }
1521
1522         r = (struct ctdb_disable_message *)data.dptr;
1523         timeout = r->timeout;
1524
1525         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
1526         if (ret != 0) {
1527                 goto done;
1528         }
1529
1530         /* Returning our PNN tells the caller that we succeeded */
1531         ret = ctdb_get_pnn(ctdb);
1532 done:
1533         result.dsize = sizeof(int32_t);
1534         result.dptr  = (uint8_t *)&ret;
1535         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
1536 }
1537
1538 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
1539                                           void *private_data)
1540 {
1541         struct ctdb_recoverd *rec = talloc_get_type(
1542                 private_data, struct ctdb_recoverd);
1543
1544         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
1545 }
1546
1547 /* Backward compatibility for this SRVID */
1548 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
1549                                      void *private_data)
1550 {
1551         struct ctdb_recoverd *rec = talloc_get_type(
1552                 private_data, struct ctdb_recoverd);
1553         uint32_t timeout;
1554
1555         if (data.dsize != sizeof(uint32_t)) {
1556                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1557                                  "expecting %lu\n", (long unsigned)data.dsize,
1558                                  (long unsigned)sizeof(uint32_t)));
1559                 return;
1560         }
1561         if (data.dptr == NULL) {
1562                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
1563                 return;
1564         }
1565
1566         timeout = *((uint32_t *)data.dptr);
1567
1568         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
1569 }
1570
1571 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
1572                                        void *private_data)
1573 {
1574         struct ctdb_recoverd *rec = talloc_get_type(
1575                 private_data, struct ctdb_recoverd);
1576
1577         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
1578 }
1579
1580 /*
1581   handler for ip reallocate, just add it to the list of requests and 
1582   handle this later in the monitor_cluster loop so we do not recurse
1583   with other requests to takeover_run()
1584 */
1585 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
1586                                   void *private_data)
1587 {
1588         struct ctdb_srvid_message *request;
1589         struct ctdb_recoverd *rec = talloc_get_type(
1590                 private_data, struct ctdb_recoverd);
1591
1592         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
1593                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1594                 return;
1595         }
1596
1597         request = (struct ctdb_srvid_message *)data.dptr;
1598
1599         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
1600 }
1601
1602 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
1603                                           struct ctdb_recoverd *rec)
1604 {
1605         TDB_DATA result;
1606         int32_t ret;
1607         struct srvid_requests *current;
1608
1609         /* Only process requests that are currently pending.  More
1610          * might come in while the takeover run is in progress and
1611          * they will need to be processed later since they might
1612          * be in response flag changes.
1613          */
1614         current = rec->reallocate_requests;
1615         rec->reallocate_requests = NULL;
1616
1617         if (do_takeover_run(rec, rec->nodemap)) {
1618                 ret = ctdb_get_pnn(ctdb);
1619         } else {
1620                 ret = -1;
1621         }
1622
1623         result.dsize = sizeof(int32_t);
1624         result.dptr  = (uint8_t *)&ret;
1625
1626         srvid_requests_reply(ctdb, &current, result);
1627 }
1628
1629 /*
1630  * handler for assigning banning credits
1631  */
1632 static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1633 {
1634         struct ctdb_recoverd *rec = talloc_get_type(
1635                 private_data, struct ctdb_recoverd);
1636         uint32_t ban_pnn;
1637
1638         /* Ignore if we are not recmaster */
1639         if (rec->ctdb->pnn != rec->recmaster) {
1640                 return;
1641         }
1642
1643         if (data.dsize != sizeof(uint32_t)) {
1644                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
1645                                   data.dsize));
1646                 return;
1647         }
1648
1649         ban_pnn = *(uint32_t *)data.dptr;
1650
1651         ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
1652 }
1653
1654 /*
1655   handler for recovery master elections
1656 */
1657 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1658 {
1659         struct ctdb_recoverd *rec = talloc_get_type(
1660                 private_data, struct ctdb_recoverd);
1661         struct ctdb_context *ctdb = rec->ctdb;
1662         int ret;
1663         struct election_message *em = (struct election_message *)data.dptr;
1664
1665         /* Ignore election packets from ourself */
1666         if (ctdb->pnn == em->pnn) {
1667                 return;
1668         }
1669
1670         /* we got an election packet - update the timeout for the election */
1671         talloc_free(rec->election_timeout);
1672         rec->election_timeout = tevent_add_timer(
1673                         ctdb->ev, ctdb,
1674                         fast_start ?
1675                                 timeval_current_ofs(0, 500000) :
1676                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1677                         ctdb_election_timeout, rec);
1678
1679         /* someone called an election. check their election data
1680            and if we disagree and we would rather be the elected node, 
1681            send a new election message to all other nodes
1682          */
1683         if (ctdb_election_win(rec, em)) {
1684                 if (!rec->send_election_te) {
1685                         rec->send_election_te = tevent_add_timer(
1686                                         ctdb->ev, rec,
1687                                         timeval_current_ofs(0, 500000),
1688                                         election_send_request, rec);
1689                 }
1690                 return;
1691         }
1692
1693         /* we didn't win */
1694         TALLOC_FREE(rec->send_election_te);
1695
1696         /* Release the recovery lock file */
1697         if (ctdb_recovery_have_lock(rec)) {
1698                 ctdb_recovery_unlock(rec);
1699         }
1700
1701         /* ok, let that guy become recmaster then */
1702         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
1703                                      CTDB_CURRENT_NODE, em->pnn);
1704         if (ret != 0) {
1705                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
1706                 return;
1707         }
1708         rec->recmaster = em->pnn;
1709
1710         return;
1711 }
1712
1713
1714 /*
1715   force the start of the election process
1716  */
1717 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1718                            struct ctdb_node_map_old *nodemap)
1719 {
1720         int ret;
1721         struct ctdb_context *ctdb = rec->ctdb;
1722
1723         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1724
1725         /* set all nodes to recovery mode to stop all internode traffic */
1726         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1727         if (ret != 0) {
1728                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1729                 return;
1730         }
1731
1732         talloc_free(rec->election_timeout);
1733         rec->election_timeout = tevent_add_timer(
1734                         ctdb->ev, ctdb,
1735                         fast_start ?
1736                                 timeval_current_ofs(0, 500000) :
1737                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
1738                         ctdb_election_timeout, rec);
1739
1740         ret = send_election_request(rec, pnn);
1741         if (ret!=0) {
1742                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1743                 return;
1744         }
1745
1746         /* wait for a few seconds to collect all responses */
1747         ctdb_wait_election(rec);
1748 }
1749
1750
1751
1752 /*
1753   handler for when a node changes its flags
1754 */
1755 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1756 {
1757         struct ctdb_recoverd *rec = talloc_get_type(
1758                 private_data, struct ctdb_recoverd);
1759         struct ctdb_context *ctdb = rec->ctdb;
1760         int ret;
1761         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1762         struct ctdb_node_map_old *nodemap=NULL;
1763         TALLOC_CTX *tmp_ctx;
1764         unsigned int i;
1765
1766         if (data.dsize != sizeof(*c)) {
1767                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1768                 return;
1769         }
1770
1771         tmp_ctx = talloc_new(ctdb);
1772         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1773
1774         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1775         if (ret != 0) {
1776                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1777                 talloc_free(tmp_ctx);
1778                 return;         
1779         }
1780
1781
1782         for (i=0;i<nodemap->num;i++) {
1783                 if (nodemap->nodes[i].pnn == c->pnn) break;
1784         }
1785
1786         if (i == nodemap->num) {
1787                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1788                 talloc_free(tmp_ctx);
1789                 return;
1790         }
1791
1792         if (c->old_flags != c->new_flags) {
1793                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1794         }
1795
1796         nodemap->nodes[i].flags = c->new_flags;
1797
1798         talloc_free(tmp_ctx);
1799 }
1800
1801 /*
1802   handler for when we need to push out flag changes to all other nodes
1803 */
1804 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
1805                                void *private_data)
1806 {
1807         struct ctdb_recoverd *rec = talloc_get_type(
1808                 private_data, struct ctdb_recoverd);
1809         struct ctdb_context *ctdb = rec->ctdb;
1810         int ret;
1811         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1812         struct ctdb_node_map_old *nodemap=NULL;
1813         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1814         uint32_t *nodes;
1815
1816         /* read the node flags from the recmaster */
1817         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
1818                                    tmp_ctx, &nodemap);
1819         if (ret != 0) {
1820                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
1821                 talloc_free(tmp_ctx);
1822                 return;
1823         }
1824         if (c->pnn >= nodemap->num) {
1825                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
1826                 talloc_free(tmp_ctx);
1827                 return;
1828         }
1829
1830         /* send the flags update to all connected nodes */
1831         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1832
1833         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1834                                       nodes, 0, CONTROL_TIMEOUT(),
1835                                       false, data,
1836                                       NULL, NULL,
1837                                       NULL) != 0) {
1838                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
1839
1840                 talloc_free(tmp_ctx);
1841                 return;
1842         }
1843
1844         talloc_free(tmp_ctx);
1845 }
1846
1847
1848 struct verify_recmode_normal_data {
1849         uint32_t count;
1850         enum monitor_result status;
1851 };
1852
1853 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1854 {
1855         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1856
1857
1858         /* one more node has responded with recmode data*/
1859         rmdata->count--;
1860
1861         /* if we failed to get the recmode, then return an error and let
1862            the main loop try again.
1863         */
1864         if (state->state != CTDB_CONTROL_DONE) {
1865                 if (rmdata->status == MONITOR_OK) {
1866                         rmdata->status = MONITOR_FAILED;
1867                 }
1868                 return;
1869         }
1870
1871         /* if we got a response, then the recmode will be stored in the
1872            status field
1873         */
1874         if (state->status != CTDB_RECOVERY_NORMAL) {
1875                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
1876                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1877         }
1878
1879         return;
1880 }
1881
1882
1883 /* verify that all nodes are in normal recovery mode */
1884 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
1885 {
1886         struct verify_recmode_normal_data *rmdata;
1887         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1888         struct ctdb_client_control_state *state;
1889         enum monitor_result status;
1890         unsigned int j;
1891
1892         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1893         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1894         rmdata->count  = 0;
1895         rmdata->status = MONITOR_OK;
1896
1897         /* loop over all active nodes and send an async getrecmode call to 
1898            them*/
1899         for (j=0; j<nodemap->num; j++) {
1900                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1901                         continue;
1902                 }
1903                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1904                                         CONTROL_TIMEOUT(), 
1905                                         nodemap->nodes[j].pnn);
1906                 if (state == NULL) {
1907                         /* we failed to send the control, treat this as 
1908                            an error and try again next iteration
1909                         */                      
1910                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1911                         talloc_free(mem_ctx);
1912                         return MONITOR_FAILED;
1913                 }
1914
1915                 /* set up the callback functions */
1916                 state->async.fn = verify_recmode_normal_callback;
1917                 state->async.private_data = rmdata;
1918
1919                 /* one more control to wait for to complete */
1920                 rmdata->count++;
1921         }
1922
1923
1924         /* now wait for up to the maximum number of seconds allowed
1925            or until all nodes we expect a response from has replied
1926         */
1927         while (rmdata->count > 0) {
1928                 tevent_loop_once(ctdb->ev);
1929         }
1930
1931         status = rmdata->status;
1932         talloc_free(mem_ctx);
1933         return status;
1934 }
1935
1936
1937 struct verify_recmaster_data {
1938         struct ctdb_recoverd *rec;
1939         uint32_t count;
1940         uint32_t pnn;
1941         enum monitor_result status;
1942 };
1943
1944 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1945 {
1946         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1947
1948
1949         /* one more node has responded with recmaster data*/
1950         rmdata->count--;
1951
1952         /* if we failed to get the recmaster, then return an error and let
1953            the main loop try again.
1954         */
1955         if (state->state != CTDB_CONTROL_DONE) {
1956                 if (rmdata->status == MONITOR_OK) {
1957                         rmdata->status = MONITOR_FAILED;
1958                 }
1959                 return;
1960         }
1961
1962         /* if we got a response, then the recmaster will be stored in the
1963            status field
1964         */
1965         if ((uint32_t)state->status != rmdata->pnn) {
1966                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
1967                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
1968                 rmdata->status = MONITOR_ELECTION_NEEDED;
1969         }
1970
1971         return;
1972 }
1973
1974
1975 /* verify that all nodes agree that we are the recmaster */
1976 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
1977 {
1978         struct ctdb_context *ctdb = rec->ctdb;
1979         struct verify_recmaster_data *rmdata;
1980         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1981         struct ctdb_client_control_state *state;
1982         enum monitor_result status;
1983         unsigned int j;
1984
1985         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1986         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1987         rmdata->rec    = rec;
1988         rmdata->count  = 0;
1989         rmdata->pnn    = pnn;
1990         rmdata->status = MONITOR_OK;
1991
1992         /* loop over all active nodes and send an async getrecmaster call to
1993            them*/
1994         for (j=0; j<nodemap->num; j++) {
1995                 if (nodemap->nodes[j].pnn == rec->recmaster) {
1996                         continue;
1997                 }
1998                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1999                         continue;
2000                 }
2001                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2002                                         CONTROL_TIMEOUT(),
2003                                         nodemap->nodes[j].pnn);
2004                 if (state == NULL) {
2005                         /* we failed to send the control, treat this as 
2006                            an error and try again next iteration
2007                         */                      
2008                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2009                         talloc_free(mem_ctx);
2010                         return MONITOR_FAILED;
2011                 }
2012
2013                 /* set up the callback functions */
2014                 state->async.fn = verify_recmaster_callback;
2015                 state->async.private_data = rmdata;
2016
2017                 /* one more control to wait for to complete */
2018                 rmdata->count++;
2019         }
2020
2021
2022         /* now wait for up to the maximum number of seconds allowed
2023            or until all nodes we expect a response from has replied
2024         */
2025         while (rmdata->count > 0) {
2026                 tevent_loop_once(ctdb->ev);
2027         }
2028
2029         status = rmdata->status;
2030         talloc_free(mem_ctx);
2031         return status;
2032 }
2033
2034 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2035                                     struct ctdb_recoverd *rec)
2036 {
2037         struct ctdb_iface_list_old *ifaces = NULL;
2038         TALLOC_CTX *mem_ctx;
2039         bool ret = false;
2040
2041         mem_ctx = talloc_new(NULL);
2042
2043         /* Read the interfaces from the local node */
2044         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2045                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2046                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2047                 /* We could return an error.  However, this will be
2048                  * rare so we'll decide that the interfaces have
2049                  * actually changed, just in case.
2050                  */
2051                 talloc_free(mem_ctx);
2052                 return true;
2053         }
2054
2055         if (!rec->ifaces) {
2056                 /* We haven't been here before so things have changed */
2057                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2058                 ret = true;
2059         } else if (rec->ifaces->num != ifaces->num) {
2060                 /* Number of interfaces has changed */
2061                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2062                                      rec->ifaces->num, ifaces->num));
2063                 ret = true;
2064         } else {
2065                 /* See if interface names or link states have changed */
2066                 unsigned int i;
2067                 for (i = 0; i < rec->ifaces->num; i++) {
2068                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
2069                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2070                                 DEBUG(DEBUG_NOTICE,
2071                                       ("Interface in slot %d changed: %s => %s\n",
2072                                        i, iface->name, ifaces->ifaces[i].name));
2073                                 ret = true;
2074                                 break;
2075                         }
2076                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2077                                 DEBUG(DEBUG_NOTICE,
2078                                       ("Interface %s changed state: %d => %d\n",
2079                                        iface->name, iface->link_state,
2080                                        ifaces->ifaces[i].link_state));
2081                                 ret = true;
2082                                 break;
2083                         }
2084                 }
2085         }
2086
2087         talloc_free(rec->ifaces);
2088         rec->ifaces = talloc_steal(rec, ifaces);
2089
2090         talloc_free(mem_ctx);
2091         return ret;
2092 }
2093
2094 /* Check that the local allocation of public IP addresses is correct
2095  * and do some house-keeping */
2096 static int verify_local_ip_allocation(struct ctdb_context *ctdb,
2097                                       struct ctdb_recoverd *rec,
2098                                       uint32_t pnn,
2099                                       struct ctdb_node_map_old *nodemap)
2100 {
2101         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2102         unsigned int j;
2103         int ret;
2104         bool need_takeover_run = false;
2105         struct ctdb_public_ip_list_old *ips = NULL;
2106
2107         /* If we are not the recmaster then do some housekeeping */
2108         if (rec->recmaster != pnn) {
2109                 /* Ignore any IP reallocate requests - only recmaster
2110                  * processes them
2111                  */
2112                 TALLOC_FREE(rec->reallocate_requests);
2113                 /* Clear any nodes that should be force rebalanced in
2114                  * the next takeover run.  If the recovery master role
2115                  * has moved then we don't want to process these some
2116                  * time in the future.
2117                  */
2118                 TALLOC_FREE(rec->force_rebalance_nodes);
2119         }
2120
2121         /* Return early if disabled... */
2122         if (ctdb_config.failover_disabled ||
2123             ctdb_op_is_disabled(rec->takeover_run)) {
2124                 talloc_free(mem_ctx);
2125                 return  0;
2126         }
2127
2128         if (interfaces_have_changed(ctdb, rec)) {
2129                 need_takeover_run = true;
2130         }
2131
2132         /* If there are unhosted IPs but this node can host them then
2133          * trigger an IP reallocation */
2134
2135         /* Read *available* IPs from local node */
2136         ret = ctdb_ctrl_get_public_ips_flags(
2137                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
2138                 CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2139         if (ret != 0) {
2140                 DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
2141                 talloc_free(mem_ctx);
2142                 return -1;
2143         }
2144
2145         for (j=0; j<ips->num; j++) {
2146                 if (ips->ips[j].pnn == CTDB_UNKNOWN_PNN &&
2147                     nodemap->nodes[pnn].flags == 0) {
2148                         DEBUG(DEBUG_WARNING,
2149                               ("Unassigned IP %s can be served by this node\n",
2150                                ctdb_addr_to_str(&ips->ips[j].addr)));
2151                         need_takeover_run = true;
2152                 }
2153         }
2154
2155         talloc_free(ips);
2156
2157         if (!ctdb->do_checkpublicip) {
2158                 goto done;
2159         }
2160
2161         /* Validate the IP addresses that this node has on network
2162          * interfaces.  If there is an inconsistency between reality
2163          * and the state expected by CTDB then try to fix it by
2164          * triggering an IP reallocation or releasing extraneous IP
2165          * addresses. */
2166
2167         /* Read *known* IPs from local node */
2168         ret = ctdb_ctrl_get_public_ips_flags(
2169                 ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2170         if (ret != 0) {
2171                 DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
2172                 talloc_free(mem_ctx);
2173                 return -1;
2174         }
2175
2176         for (j=0; j<ips->num; j++) {
2177                 if (ips->ips[j].pnn == pnn) {
2178                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2179                                 DEBUG(DEBUG_ERR,
2180                                       ("Assigned IP %s not on an interface\n",
2181                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2182                                 need_takeover_run = true;
2183                         }
2184                 } else {
2185                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2186                                 DEBUG(DEBUG_ERR,
2187                                       ("IP %s incorrectly on an interface\n",
2188                                        ctdb_addr_to_str(&ips->ips[j].addr)));
2189                                 need_takeover_run = true;
2190                         }
2191                 }
2192         }
2193
2194 done:
2195         if (need_takeover_run) {
2196                 struct ctdb_srvid_message rd;
2197                 TDB_DATA data;
2198
2199                 DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
2200
2201                 ZERO_STRUCT(rd);
2202                 rd.pnn = ctdb->pnn;
2203                 rd.srvid = 0;
2204                 data.dptr = (uint8_t *)&rd;
2205                 data.dsize = sizeof(rd);
2206
2207                 ret = ctdb_client_send_message(ctdb,
2208                                                CTDB_BROADCAST_CONNECTED,
2209                                                CTDB_SRVID_TAKEOVER_RUN,
2210                                                data);
2211                 if (ret != 0) {
2212                         D_ERR("Failed to send takeover run request\n");
2213                 }
2214         }
2215         talloc_free(mem_ctx);
2216         return 0;
2217 }
2218
2219
2220 struct remote_nodemaps_state {
2221         struct ctdb_node_map_old **remote_nodemaps;
2222         struct ctdb_recoverd *rec;
2223 };
2224
2225 static void async_getnodemap_callback(struct ctdb_context *ctdb,
2226                                       uint32_t node_pnn,
2227                                       int32_t res,
2228                                       TDB_DATA outdata,
2229                                       void *callback_data)
2230 {
2231         struct remote_nodemaps_state *state =
2232                 (struct remote_nodemaps_state *)callback_data;
2233         struct ctdb_node_map_old **remote_nodemaps = state->remote_nodemaps;
2234         struct ctdb_node_map_old *nodemap = state->rec->nodemap;
2235         size_t i;
2236
2237         for (i = 0; i < nodemap->num; i++) {
2238                 if (nodemap->nodes[i].pnn == node_pnn) {
2239                         break;
2240                 }
2241         }
2242
2243         if (i >= nodemap->num) {
2244                 DBG_ERR("Invalid PNN %"PRIu32"\n", node_pnn);
2245                 return;
2246         }
2247
2248         remote_nodemaps[i] = (struct ctdb_node_map_old *)talloc_steal(
2249                                         remote_nodemaps, outdata.dptr);
2250
2251 }
2252
2253 static void async_getnodemap_error(struct ctdb_context *ctdb,
2254                                    uint32_t node_pnn,
2255                                    int32_t res,
2256                                    TDB_DATA outdata,
2257                                    void *callback_data)
2258 {
2259         struct remote_nodemaps_state *state =
2260                 (struct remote_nodemaps_state *)callback_data;
2261         struct ctdb_recoverd *rec = state->rec;
2262
2263         DBG_ERR("Failed to retrieve nodemap from node %u\n", node_pnn);
2264         ctdb_set_culprit(rec, node_pnn);
2265 }
2266
2267 static int get_remote_nodemaps(struct ctdb_recoverd *rec,
2268                                TALLOC_CTX *mem_ctx,
2269                                struct ctdb_node_map_old ***remote_nodemaps)
2270 {
2271         struct ctdb_context *ctdb = rec->ctdb;
2272         struct ctdb_node_map_old **t;
2273         uint32_t *nodes;
2274         struct remote_nodemaps_state state;
2275         int ret;
2276
2277         t = talloc_zero_array(mem_ctx,
2278                               struct ctdb_node_map_old *,
2279                               rec->nodemap->num);
2280         if (t == NULL) {
2281                 DBG_ERR("Memory allocation error\n");
2282                 return -1;
2283         }
2284
2285         nodes = list_of_connected_nodes(ctdb, rec->nodemap, mem_ctx, false);
2286
2287         state.remote_nodemaps = t;
2288         state.rec = rec;
2289
2290         ret = ctdb_client_async_control(ctdb,
2291                                         CTDB_CONTROL_GET_NODEMAP,
2292                                         nodes,
2293                                         0,
2294                                         CONTROL_TIMEOUT(),
2295                                         false,
2296                                         tdb_null,
2297                                         async_getnodemap_callback,
2298                                         async_getnodemap_error,
2299                                         &state);
2300         talloc_free(nodes);
2301
2302         if (ret != 0) {
2303                 talloc_free(t);
2304                 return ret;
2305         }
2306
2307         *remote_nodemaps = t;
2308         return 0;
2309 }
2310
2311 static bool validate_recovery_master(struct ctdb_recoverd *rec,
2312                                      TALLOC_CTX *mem_ctx)
2313 {
2314         struct ctdb_context *ctdb = rec->ctdb;
2315         uint32_t pnn = ctdb_get_pnn(ctdb);
2316         struct ctdb_node_map_old *nodemap = rec->nodemap;
2317         struct ctdb_node_map_old *recmaster_nodemap = NULL;
2318         int ret;
2319
2320         /* When recovery daemon is started, recmaster is set to
2321          * "unknown" so it knows to start an election.
2322          */
2323         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
2324                 DEBUG(DEBUG_NOTICE,
2325                       ("Initial recovery master set - forcing election\n"));
2326                 force_election(rec, pnn, nodemap);
2327                 return false;
2328         }
2329
2330         /*
2331          * If the current recmaster does not have CTDB_CAP_RECMASTER,
2332          * but we have, then force an election and try to become the new
2333          * recmaster.
2334          */
2335         if (!ctdb_node_has_capabilities(rec->caps,
2336                                         rec->recmaster,
2337                                         CTDB_CAP_RECMASTER) &&
2338             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
2339             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
2340                 DEBUG(DEBUG_ERR,
2341                       (" Current recmaster node %u does not have CAP_RECMASTER,"
2342                        " but we (node %u) have - force an election\n",
2343                        rec->recmaster, pnn));
2344                 force_election(rec, pnn, nodemap);
2345                 return false;
2346         }
2347
2348         /* Verify that the master node has not been deleted.  This
2349          * should not happen because a node should always be shutdown
2350          * before being deleted, causing a new master to be elected
2351          * before now.  However, if something strange has happened
2352          * then checking here will ensure we don't index beyond the
2353          * end of the nodemap array. */
2354         if (rec->recmaster >= nodemap->num) {
2355                 DEBUG(DEBUG_ERR,
2356                       ("Recmaster node %u has been deleted. Force election\n",
2357                        rec->recmaster));
2358                 force_election(rec, pnn, nodemap);
2359                 return false;
2360         }
2361
2362         /* if recovery master is disconnected/deleted we must elect a new recmaster */
2363         if (nodemap->nodes[rec->recmaster].flags &
2364             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
2365                 DEBUG(DEBUG_NOTICE,
2366                       ("Recmaster node %u is disconnected/deleted. Force election\n",
2367                        rec->recmaster));
2368                 force_election(rec, pnn, nodemap);
2369                 return false;
2370         }
2371
2372         /* get nodemap from the recovery master to check if it is inactive */
2373         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2374                                    mem_ctx, &recmaster_nodemap);
2375         if (ret != 0) {
2376                 DEBUG(DEBUG_ERR,
2377                       (__location__
2378                        " Unable to get nodemap from recovery master %u\n",
2379                           rec->recmaster));
2380                 /* No election, just error */
2381                 return false;
2382         }
2383
2384
2385         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
2386             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
2387                 DEBUG(DEBUG_NOTICE,
2388                       ("Recmaster node %u is inactive. Force election\n",
2389                        rec->recmaster));
2390                 /*
2391                  * update our nodemap to carry the recmaster's notion of
2392                  * its own flags, so that we don't keep freezing the
2393                  * inactive recmaster node...
2394                  */
2395                 nodemap->nodes[rec->recmaster].flags =
2396                         recmaster_nodemap->nodes[rec->recmaster].flags;
2397                 force_election(rec, pnn, nodemap);
2398                 return false;
2399         }
2400
2401         return true;
2402 }
2403
2404 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2405                       TALLOC_CTX *mem_ctx)
2406 {
2407         uint32_t pnn;
2408         struct ctdb_node_map_old *nodemap=NULL;
2409         struct ctdb_node_map_old **remote_nodemaps=NULL;
2410         struct ctdb_vnn_map *vnnmap=NULL;
2411         struct ctdb_vnn_map *remote_vnnmap=NULL;
2412         uint32_t num_lmasters;
2413         int32_t debug_level;
2414         unsigned int i, j;
2415         int ret;
2416         bool self_ban;
2417
2418
2419         /* verify that the main daemon is still running */
2420         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
2421                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2422                 exit(-1);
2423         }
2424
2425         /* ping the local daemon to tell it we are alive */
2426         ctdb_ctrl_recd_ping(ctdb);
2427
2428         if (rec->election_timeout) {
2429                 /* an election is in progress */
2430                 return;
2431         }
2432
2433         /* read the debug level from the parent and update locally */
2434         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2435         if (ret !=0) {
2436                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2437                 return;
2438         }
2439         debuglevel_set(debug_level);
2440
2441         /* get relevant tunables */
2442         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2443         if (ret != 0) {
2444                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2445                 return;
2446         }
2447
2448         /* get runstate */
2449         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
2450                                      CTDB_CURRENT_NODE, &ctdb->runstate);
2451         if (ret != 0) {
2452                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
2453                 return;
2454         }
2455
2456         pnn = ctdb_get_pnn(ctdb);
2457
2458         /* get nodemap */
2459         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &nodemap);
2460         if (ret != 0) {
2461                 DBG_ERR("Unable to get nodemap from node %"PRIu32"\n", pnn);
2462                 return;
2463         }
2464         talloc_free(rec->nodemap);
2465         rec->nodemap = nodemap;
2466
2467         /* remember our own node flags */
2468         rec->node_flags = nodemap->nodes[pnn].flags;
2469
2470         ban_misbehaving_nodes(rec, &self_ban);
2471         if (self_ban) {
2472                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
2473                 return;
2474         }
2475
2476         ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2477                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2478         if (ret != 0) {
2479                 D_ERR("Failed to read recmode from local node\n");
2480                 return;
2481         }
2482
2483         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2484            also frozen and that the recmode is set to active.
2485         */
2486         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2487                 /* If this node has become inactive then we want to
2488                  * reduce the chances of it taking over the recovery
2489                  * master role when it becomes active again.  This
2490                  * helps to stabilise the recovery master role so that
2491                  * it stays on the most stable node.
2492                  */
2493                 rec->priority_time = timeval_current();
2494
2495                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2496                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2497
2498                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2499                         if (ret != 0) {
2500                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
2501
2502                                 return;
2503                         }
2504                 }
2505                 if (! rec->frozen_on_inactive) {
2506                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
2507                                                CTDB_CURRENT_NODE);
2508                         if (ret != 0) {
2509                                 DEBUG(DEBUG_ERR,
2510                                       (__location__ " Failed to freeze node "
2511                                        "in STOPPED or BANNED state\n"));
2512                                 return;
2513                         }
2514
2515                         rec->frozen_on_inactive = true;
2516                 }
2517
2518                 /* If this node is stopped or banned then it is not the recovery
2519                  * master, so don't do anything. This prevents stopped or banned
2520                  * node from starting election and sending unnecessary controls.
2521                  */
2522                 return;
2523         }
2524
2525         rec->frozen_on_inactive = false;
2526
2527         /* Retrieve capabilities from all connected nodes */
2528         ret = update_capabilities(rec, nodemap);
2529         if (ret != 0) {
2530                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2531                 return;
2532         }
2533
2534         if (! validate_recovery_master(rec, mem_ctx)) {
2535                 return;
2536         }
2537
2538         if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2539                 /* Check if an IP takeover run is needed and trigger one if
2540                  * necessary */
2541                 verify_local_ip_allocation(ctdb, rec, pnn, nodemap);
2542         }
2543
2544         /* if we are not the recmaster then we do not need to check
2545            if recovery is needed
2546          */
2547         if (pnn != rec->recmaster) {
2548                 return;
2549         }
2550
2551
2552         /* Get the nodemaps for all connected remote nodes */
2553         ret = get_remote_nodemaps(rec, mem_ctx, &remote_nodemaps);
2554         if (ret != 0) {
2555                 DBG_ERR("Failed to read remote nodemaps\n");
2556                 return;
2557         }
2558
2559         /* Ensure our local and remote flags are correct */
2560         ret = update_flags(rec, nodemap, remote_nodemaps);
2561         if (ret != 0) {
2562                 D_ERR("Unable to update flags\n");
2563                 return;
2564         }
2565
2566         if (ctdb->num_nodes != nodemap->num) {
2567                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2568                 ctdb_load_nodes_file(ctdb);
2569                 return;
2570         }
2571
2572         /* verify that all active nodes agree that we are the recmaster */
2573         switch (verify_recmaster(rec, nodemap, pnn)) {
2574         case MONITOR_RECOVERY_NEEDED:
2575                 /* can not happen */
2576                 return;
2577         case MONITOR_ELECTION_NEEDED:
2578                 force_election(rec, pnn, nodemap);
2579                 return;
2580         case MONITOR_OK:
2581                 break;
2582         case MONITOR_FAILED:
2583                 return;
2584         }
2585
2586
2587         /* get the vnnmap */
2588         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2589         if (ret != 0) {
2590                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2591                 return;
2592         }
2593
2594         if (rec->need_recovery) {
2595                 /* a previous recovery didn't finish */
2596                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2597                 return;
2598         }
2599
2600         /* verify that all active nodes are in normal mode 
2601            and not in recovery mode 
2602         */
2603         switch (verify_recmode(ctdb, nodemap)) {
2604         case MONITOR_RECOVERY_NEEDED:
2605                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2606                 return;
2607         case MONITOR_FAILED:
2608                 return;
2609         case MONITOR_ELECTION_NEEDED:
2610                 /* can not happen */
2611         case MONITOR_OK:
2612                 break;
2613         }
2614
2615
2616         if (ctdb->recovery_lock != NULL) {
2617                 /* We must already hold the recovery lock */
2618                 if (!ctdb_recovery_have_lock(rec)) {
2619                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
2620                         ctdb_set_culprit(rec, ctdb->pnn);
2621                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2622                         return;
2623                 }
2624         }
2625
2626
2627         /* If recoveries are disabled then there is no use doing any
2628          * nodemap or flags checks.  Recoveries might be disabled due
2629          * to "reloadnodes", so doing these checks might cause an
2630          * unnecessary recovery.  */
2631         if (ctdb_op_is_disabled(rec->recovery)) {
2632                 goto takeover_run_checks;
2633         }
2634
2635         /* verify that all other nodes have the same nodemap as we have
2636         */
2637         for (j=0; j<nodemap->num; j++) {
2638                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
2639                         continue;
2640                 }
2641                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2642                         continue;
2643                 }
2644
2645                 /* if the nodes disagree on how many nodes there are
2646                    then this is a good reason to try recovery
2647                  */
2648                 if (remote_nodemaps[j]->num != nodemap->num) {
2649                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2650                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2651                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2652                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2653                         return;
2654                 }
2655
2656                 /* if the nodes disagree on which nodes exist and are
2657                    active, then that is also a good reason to do recovery
2658                  */
2659                 for (i=0;i<nodemap->num;i++) {
2660                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2661                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2662                                           nodemap->nodes[j].pnn, i, 
2663                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2664                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2665                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2666                                             vnnmap);
2667                                 return;
2668                         }
2669                 }
2670         }
2671
2672         /*
2673          * Update node flags obtained from each active node. This ensure we have
2674          * up-to-date information for all the nodes.
2675          */
2676         for (j=0; j<nodemap->num; j++) {
2677                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
2678                         continue;
2679                 }
2680                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2681                         continue;
2682                 }
2683                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
2684         }
2685
2686         for (j=0; j<nodemap->num; j++) {
2687                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
2688                         continue;
2689                 }
2690                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2691                         continue;
2692                 }
2693
2694                 /* verify the flags are consistent
2695                 */
2696                 for (i=0; i<nodemap->num; i++) {
2697                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2698                                 continue;
2699                         }
2700                         
2701                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2702                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2703                                   nodemap->nodes[j].pnn, 
2704                                   nodemap->nodes[i].pnn, 
2705                                   remote_nodemaps[j]->nodes[i].flags,
2706                                   nodemap->nodes[i].flags));
2707                                 if (i == j) {
2708                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2709                                         update_flags_on_all_nodes(
2710                                             rec,
2711                                             nodemap->nodes[i].pnn,
2712                                             remote_nodemaps[j]->nodes[i].flags);
2713                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2714                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2715                                                     vnnmap);
2716                                         return;
2717                                 } else {
2718                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2719                                         update_flags_on_all_nodes(
2720                                                 rec,
2721                                                 nodemap->nodes[i].pnn,
2722                                                 nodemap->nodes[i].flags);
2723                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2724                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2725                                                     vnnmap);
2726                                         return;
2727                                 }
2728                         }
2729                 }
2730         }
2731
2732
2733         /* count how many active nodes there are */
2734         num_lmasters  = 0;
2735         for (i=0; i<nodemap->num; i++) {
2736                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2737                         if (ctdb_node_has_capabilities(rec->caps,
2738                                                        ctdb->nodes[i]->pnn,
2739                                                        CTDB_CAP_LMASTER)) {
2740                                 num_lmasters++;
2741                         }
2742                 }
2743         }
2744
2745
2746         /* There must be the same number of lmasters in the vnn map as
2747          * there are active nodes with the lmaster capability...  or
2748          * do a recovery.
2749          */
2750         if (vnnmap->size != num_lmasters) {
2751                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
2752                           vnnmap->size, num_lmasters));
2753                 ctdb_set_culprit(rec, ctdb->pnn);
2754                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2755                 return;
2756         }
2757
2758         /*
2759          * Verify that all active lmaster nodes in the nodemap also
2760          * exist in the vnnmap
2761          */
2762         for (j=0; j<nodemap->num; j++) {
2763                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2764                         continue;
2765                 }
2766                 if (! ctdb_node_has_capabilities(rec->caps,
2767                                                  nodemap->nodes[j].pnn,
2768                                                  CTDB_CAP_LMASTER)) {
2769                         continue;
2770                 }
2771                 if (nodemap->nodes[j].pnn == pnn) {
2772                         continue;
2773                 }
2774
2775                 for (i=0; i<vnnmap->size; i++) {
2776                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2777                                 break;
2778                         }
2779                 }
2780                 if (i == vnnmap->size) {
2781                         D_ERR("Active LMASTER node %u is not in the vnnmap\n",
2782                               nodemap->nodes[j].pnn);
2783                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2784                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2785                         return;
2786                 }
2787         }
2788
2789         
2790         /* verify that all other nodes have the same vnnmap
2791            and are from the same generation
2792          */
2793         for (j=0; j<nodemap->num; j++) {
2794                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2795                         continue;
2796                 }
2797                 if (nodemap->nodes[j].pnn == pnn) {
2798                         continue;
2799                 }
2800
2801                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2802                                           mem_ctx, &remote_vnnmap);
2803                 if (ret != 0) {
2804                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2805                                   nodemap->nodes[j].pnn));
2806                         return;
2807                 }
2808
2809                 /* verify the vnnmap generation is the same */
2810                 if (vnnmap->generation != remote_vnnmap->generation) {
2811                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2812                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2813                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2814                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2815                         return;
2816                 }
2817
2818                 /* verify the vnnmap size is the same */
2819                 if (vnnmap->size != remote_vnnmap->size) {
2820                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2821                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2822                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2823                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2824                         return;
2825                 }
2826
2827                 /* verify the vnnmap is the same */
2828                 for (i=0;i<vnnmap->size;i++) {
2829                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2830                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2831                                           nodemap->nodes[j].pnn));
2832                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2833                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2834                                             vnnmap);
2835                                 return;
2836                         }
2837                 }
2838         }
2839
2840         /* FIXME: Add remote public IP checking to ensure that nodes
2841          * have the IP addresses that are allocated to them. */
2842
2843 takeover_run_checks:
2844
2845         /* If there are IP takeover runs requested or the previous one
2846          * failed then perform one and notify the waiters */
2847         if (!ctdb_op_is_disabled(rec->takeover_run) &&
2848             (rec->reallocate_requests || rec->need_takeover_run)) {
2849                 process_ipreallocate_requests(ctdb, rec);
2850         }
2851 }
2852
2853 static void recd_sig_term_handler(struct tevent_context *ev,
2854                                   struct tevent_signal *se, int signum,
2855                                   int count, void *dont_care,
2856                                   void *private_data)
2857 {
2858         struct ctdb_recoverd *rec = talloc_get_type_abort(
2859                 private_data, struct ctdb_recoverd);
2860
2861         DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
2862         ctdb_recovery_unlock(rec);
2863         exit(0);
2864 }
2865
2866 /*
2867  * Periodically log elements of the cluster state
2868  *
2869  * This can be used to confirm a split brain has occurred
2870  */
2871 static void maybe_log_cluster_state(struct tevent_context *ev,
2872                                     struct tevent_timer *te,
2873                                     struct timeval current_time,
2874                                     void *private_data)
2875 {
2876         struct ctdb_recoverd *rec = talloc_get_type_abort(
2877                 private_data, struct ctdb_recoverd);
2878         struct ctdb_context *ctdb = rec->ctdb;
2879         struct tevent_timer *tt;
2880
2881         static struct timeval start_incomplete = {
2882                 .tv_sec = 0,
2883         };
2884
2885         bool is_complete;
2886         bool was_complete;
2887         unsigned int i;
2888         double seconds;
2889         unsigned int minutes;
2890         unsigned int num_connected;
2891
2892         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2893                 goto done;
2894         }
2895
2896         if (rec->nodemap == NULL) {
2897                 goto done;
2898         }
2899
2900         is_complete = true;
2901         num_connected = 0;
2902         for (i = 0; i < rec->nodemap->num; i++) {
2903                 struct ctdb_node_and_flags *n = &rec->nodemap->nodes[i];
2904
2905                 if (n->pnn == ctdb_get_pnn(ctdb)) {
2906                         continue;
2907                 }
2908                 if ((n->flags & NODE_FLAGS_DELETED) != 0) {
2909                         continue;
2910                 }
2911                 if ((n->flags & NODE_FLAGS_DISCONNECTED) != 0) {
2912                         is_complete = false;
2913                         continue;
2914                 }
2915
2916                 num_connected++;
2917         }
2918
2919         was_complete = timeval_is_zero(&start_incomplete);
2920
2921         if (is_complete) {
2922                 if (! was_complete) {
2923                         D_WARNING("Cluster complete with master=%u\n",
2924                                   rec->recmaster);
2925                         start_incomplete = timeval_zero();
2926                 }
2927                 goto done;
2928         }
2929
2930         /* Cluster is newly incomplete... */
2931         if (was_complete) {
2932                 start_incomplete = current_time;
2933                 minutes = 0;
2934                 goto log;
2935         }
2936
2937         /*
2938          * Cluster has been incomplete since previous check, so figure
2939          * out how long (in minutes) and decide whether to log anything
2940          */
2941         seconds = timeval_elapsed2(&start_incomplete, &current_time);
2942         minutes = (unsigned int)seconds / 60;
2943         if (minutes >= 60) {
2944                 /* Over an hour, log every hour */
2945                 if (minutes % 60 != 0) {
2946                         goto done;
2947                 }
2948         } else if (minutes >= 10) {
2949                 /* Over 10 minutes, log every 10 minutes */
2950                 if (minutes % 10 != 0) {
2951                         goto done;
2952                 }
2953         }
2954
2955 log:
2956         D_WARNING("Cluster incomplete with master=%u, elapsed=%u minutes, "
2957                   "connected=%u\n",
2958                   rec->recmaster,
2959                   minutes,
2960                   num_connected);
2961
2962 done:
2963         tt = tevent_add_timer(ctdb->ev,
2964                               rec,
2965                               timeval_current_ofs(60, 0),
2966                               maybe_log_cluster_state,
2967                               rec);
2968         if (tt == NULL) {
2969                 DBG_WARNING("Failed to set up cluster state timer\n");
2970         }
2971 }
2972
2973 /*
2974   the main monitoring loop
2975  */
2976 static void monitor_cluster(struct ctdb_context *ctdb)
2977 {
2978         struct tevent_signal *se;
2979         struct ctdb_recoverd *rec;
2980
2981         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2982
2983         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2984         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2985
2986         rec->ctdb = ctdb;
2987         rec->recmaster = CTDB_UNKNOWN_PNN;
2988         rec->recovery_lock_handle = NULL;
2989
2990         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
2991         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
2992
2993         rec->recovery = ctdb_op_init(rec, "recoveries");
2994         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
2995
2996         rec->priority_time = timeval_current();
2997         rec->frozen_on_inactive = false;
2998
2999         se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
3000                                recd_sig_term_handler, rec);
3001         if (se == NULL) {
3002                 DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
3003                 exit(1);
3004         }
3005
3006         if (ctdb->recovery_lock == NULL) {
3007                 struct tevent_timer *tt;
3008
3009                 tt = tevent_add_timer(ctdb->ev,
3010                                       rec,
3011                                       timeval_current_ofs(60, 0),
3012                                       maybe_log_cluster_state,
3013                                       rec);
3014                 if (tt == NULL) {
3015                         DBG_WARNING("Failed to set up cluster state timer\n");
3016                 }
3017         }
3018
3019         /* register a message port for sending memory dumps */
3020         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3021
3022         /* when a node is assigned banning credits */
3023         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
3024                                         banning_handler, rec);
3025
3026         /* register a message port for recovery elections */
3027         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3028
3029         /* when nodes are disabled/enabled */
3030         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3031
3032         /* when we are asked to puch out a flag change */
3033         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3034
3035         /* register a message port for reloadnodes  */
3036         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3037
3038         /* register a message port for performing a takeover run */
3039         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3040
3041         /* register a message port for disabling the ip check for a short while */
3042         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3043
3044         /* register a message port for forcing a rebalance of a node next
3045            reallocation */
3046         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3047
3048         /* Register a message port for disabling takeover runs */
3049         ctdb_client_set_message_handler(ctdb,
3050                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3051                                         disable_takeover_runs_handler, rec);
3052
3053         /* Register a message port for disabling recoveries */
3054         ctdb_client_set_message_handler(ctdb,
3055                                         CTDB_SRVID_DISABLE_RECOVERIES,
3056                                         disable_recoveries_handler, rec);
3057
3058         for (;;) {
3059                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3060                 struct timeval start;
3061                 double elapsed;
3062
3063                 if (!mem_ctx) {
3064                         DEBUG(DEBUG_CRIT,(__location__
3065                                           " Failed to create temp context\n"));
3066                         exit(-1);
3067                 }
3068
3069                 start = timeval_current();
3070                 main_loop(ctdb, rec, mem_ctx);
3071                 talloc_free(mem_ctx);
3072
3073                 /* we only check for recovery once every second */
3074                 elapsed = timeval_elapsed(&start);
3075                 if (elapsed < ctdb->tunable.recover_interval) {
3076                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3077                                           - elapsed);
3078                 }
3079         }
3080 }
3081
3082 /*
3083   event handler for when the main ctdbd dies
3084  */
3085 static void ctdb_recoverd_parent(struct tevent_context *ev,
3086                                  struct tevent_fd *fde,
3087                                  uint16_t flags, void *private_data)
3088 {
3089         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3090         _exit(1);
3091 }
3092
3093 /*
3094   called regularly to verify that the recovery daemon is still running
3095  */
3096 static void ctdb_check_recd(struct tevent_context *ev,
3097                             struct tevent_timer *te,
3098                             struct timeval yt, void *p)
3099 {
3100         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3101
3102         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3103                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3104
3105                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3106                                  ctdb_restart_recd, ctdb);
3107
3108                 return;
3109         }
3110
3111         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3112                          timeval_current_ofs(30, 0),
3113                          ctdb_check_recd, ctdb);
3114 }
3115
3116 static void recd_sig_child_handler(struct tevent_context *ev,
3117                                    struct tevent_signal *se, int signum,
3118                                    int count, void *dont_care,
3119                                    void *private_data)
3120 {
3121 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3122         int status;
3123         pid_t pid = -1;
3124
3125         while (pid != 0) {
3126                 pid = waitpid(-1, &status, WNOHANG);
3127                 if (pid == -1) {
3128                         if (errno != ECHILD) {
3129                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3130                         }
3131                         return;
3132                 }
3133                 if (pid > 0) {
3134                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3135                 }
3136         }
3137 }
3138
3139 /*
3140   startup the recovery daemon as a child of the main ctdb daemon
3141  */
3142 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3143 {
3144         int fd[2];
3145         struct tevent_signal *se;
3146         struct tevent_fd *fde;
3147         int ret;
3148
3149         if (pipe(fd) != 0) {
3150                 return -1;
3151         }
3152
3153         ctdb->recoverd_pid = ctdb_fork(ctdb);
3154         if (ctdb->recoverd_pid == -1) {
3155                 return -1;
3156         }
3157
3158         if (ctdb->recoverd_pid != 0) {
3159                 talloc_free(ctdb->recd_ctx);
3160                 ctdb->recd_ctx = talloc_new(ctdb);
3161                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
3162
3163                 close(fd[0]);
3164                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3165                                  timeval_current_ofs(30, 0),
3166                                  ctdb_check_recd, ctdb);
3167                 return 0;
3168         }
3169
3170         close(fd[1]);
3171
3172         srandom(getpid() ^ time(NULL));
3173
3174         ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
3175         if (ret != 0) {
3176                 return -1;
3177         }
3178
3179         prctl_set_comment("ctdb_recoverd");
3180         if (switch_from_server_to_client(ctdb) != 0) {
3181                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3182                 exit(1);
3183         }
3184
3185         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3186
3187         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
3188                             ctdb_recoverd_parent, &fd[0]);
3189         tevent_fd_set_auto_close(fde);
3190
3191         /* set up a handler to pick up sigchld */
3192         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
3193                                recd_sig_child_handler, ctdb);
3194         if (se == NULL) {
3195                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3196                 exit(1);
3197         }
3198
3199         monitor_cluster(ctdb);
3200
3201         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3202         return -1;
3203 }
3204
3205 /*
3206   shutdown the recovery daemon
3207  */
3208 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3209 {
3210         if (ctdb->recoverd_pid == 0) {
3211                 return;
3212         }
3213
3214         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3215         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3216
3217         TALLOC_FREE(ctdb->recd_ctx);
3218         TALLOC_FREE(ctdb->recd_ping_count);
3219 }
3220
3221 static void ctdb_restart_recd(struct tevent_context *ev,
3222                               struct tevent_timer *te,
3223                               struct timeval t, void *private_data)
3224 {
3225         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3226
3227         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3228         ctdb_stop_recoverd(ctdb);
3229         ctdb_start_recoverd(ctdb);
3230 }