merge from tridge
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31
32
33 struct ban_state {
34         struct ctdb_recoverd *rec;
35         uint32_t banned_node;
36 };
37
38 /*
39   private state of recovery daemon
40  */
41 struct ctdb_recoverd {
42         struct ctdb_context *ctdb;
43         uint32_t last_culprit;
44         uint32_t culprit_counter;
45         struct timeval first_recover_time;
46         struct ban_state **banned_nodes;
47         struct timeval priority_time;
48         bool need_takeover_run;
49         bool need_recovery;
50         uint32_t node_flags;
51         struct timed_event *send_election_te;
52         struct timed_event *election_timeout;
53 };
54
55 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
56 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
57
58
59 struct async_data {
60         uint32_t count;
61         uint32_t fail_count;
62 };
63
64 static void async_callback(struct ctdb_client_control_state *state)
65 {
66         struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
67         int ret;
68         int32_t res;
69
70         /* one more node has responded with recmode data */
71         data->count--;
72
73         /* if we failed to push the db, then return an error and let
74            the main loop try again.
75         */
76         if (state->state != CTDB_CONTROL_DONE) {
77                 DEBUG(0,("Async operation failed with state %d\n", state->state));
78                 data->fail_count++;
79                 return;
80         }
81         
82         state->async.fn = NULL;
83
84         ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
85         if ((ret != 0) || (res != 0)) {
86                 DEBUG(0,("Async operation failed with ret=%d res=%d\n", ret, (int)res));
87                 data->fail_count++;
88         }
89 }
90
91
92 static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
93 {
94         /* set up the callback functions */
95         state->async.fn = async_callback;
96         state->async.private_data = data;
97         
98         /* one more control to wait for to complete */
99         data->count++;
100 }
101
102
103 /* wait for up to the maximum number of seconds allowed
104    or until all nodes we expect a response from has replied
105 */
106 static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
107 {
108         while (data->count > 0) {
109                 event_loop_once(ctdb->ev);
110         }
111         if (data->fail_count != 0) {
112                 DEBUG(0,("Async wait failed - fail_count=%u\n", data->fail_count));
113                 return -1;
114         }
115         return 0;
116 }
117
118
119 /*
120   unban a node
121  */
122 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
123 {
124         struct ctdb_context *ctdb = rec->ctdb;
125
126         DEBUG(0,("Unbanning node %u\n", pnn));
127
128         if (!ctdb_validate_pnn(ctdb, pnn)) {
129                 DEBUG(0,("Bad pnn %u in ctdb_unban_node\n", pnn));
130                 return;
131         }
132
133         /* If we are unbanning a different node then just pass the ban info on */
134         if (pnn != ctdb->pnn) {
135                 TDB_DATA data;
136                 int ret;
137                 
138                 DEBUG(0,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
139
140                 data.dptr = (uint8_t *)&pnn;
141                 data.dsize = sizeof(uint32_t);
142
143                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
144                 if (ret != 0) {
145                         DEBUG(0,("Failed to unban node %u\n", pnn));
146                         return;
147                 }
148
149                 return;
150         }
151
152         /* make sure we remember we are no longer banned in case 
153            there is an election */
154         rec->node_flags &= ~NODE_FLAGS_BANNED;
155
156         DEBUG(0,("Clearing ban flag on node %u\n", pnn));
157         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
158
159         if (rec->banned_nodes[pnn] == NULL) {
160                 DEBUG(0,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
161                 return;
162         }
163
164         talloc_free(rec->banned_nodes[pnn]);
165         rec->banned_nodes[pnn] = NULL;
166 }
167
168
169 /*
170   called when a ban has timed out
171  */
172 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
173 {
174         struct ban_state *state = talloc_get_type(p, struct ban_state);
175         struct ctdb_recoverd *rec = state->rec;
176         uint32_t pnn = state->banned_node;
177
178         DEBUG(0,("Ban timeout. Node %u is now unbanned\n", pnn));
179         ctdb_unban_node(rec, pnn);
180 }
181
182 /*
183   ban a node for a period of time
184  */
185 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
186 {
187         struct ctdb_context *ctdb = rec->ctdb;
188
189         DEBUG(0,("Banning node %u for %u seconds\n", pnn, ban_time));
190
191         if (!ctdb_validate_pnn(ctdb, pnn)) {
192                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
193                 return;
194         }
195
196         if (0 == ctdb->tunable.enable_bans) {
197                 DEBUG(0,("Bans are disabled - ignoring ban of node %u\n", pnn));
198                 return;
199         }
200
201         /* If we are banning a different node then just pass the ban info on */
202         if (pnn != ctdb->pnn) {
203                 struct ctdb_ban_info b;
204                 TDB_DATA data;
205                 int ret;
206                 
207                 DEBUG(0,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
208
209                 b.pnn = pnn;
210                 b.ban_time = ban_time;
211
212                 data.dptr = (uint8_t *)&b;
213                 data.dsize = sizeof(b);
214
215                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
216                 if (ret != 0) {
217                         DEBUG(0,("Failed to ban node %u\n", pnn));
218                         return;
219                 }
220
221                 return;
222         }
223
224         DEBUG(0,("self ban - lowering our election priority\n"));
225         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
226
227         /* banning ourselves - lower our election priority */
228         rec->priority_time = timeval_current();
229
230         /* make sure we remember we are banned in case there is an 
231            election */
232         rec->node_flags |= NODE_FLAGS_BANNED;
233
234         if (rec->banned_nodes[pnn] != NULL) {
235                 DEBUG(0,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));               
236                 talloc_free(rec->banned_nodes[pnn]);
237                 rec->banned_nodes[pnn] = NULL;
238         }
239
240         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
241         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
242
243         rec->banned_nodes[pnn]->rec = rec;
244         rec->banned_nodes[pnn]->banned_node = pnn;
245
246         if (ban_time != 0) {
247                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
248                                 timeval_current_ofs(ban_time, 0),
249                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
250         }
251 }
252
253 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
254
255
256 /* 
257    perform a simple control on all active nodes. The control cannot return data
258  */
259 static int async_control_on_active_nodes(struct ctdb_context *ctdb, enum ctdb_controls opcode,
260                                          struct ctdb_node_map *nodemap, TDB_DATA data, bool include_self)
261 {
262         struct async_data *async_data;
263         struct ctdb_client_control_state *state;
264         int j;
265         struct timeval timeout = CONTROL_TIMEOUT();
266         
267         async_data = talloc_zero(ctdb, struct async_data);
268         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
269
270         /* loop over all active nodes and send an async control to each of them */
271         for (j=0; j<nodemap->num; j++) {
272                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
273                         continue;
274                 }
275                 if (nodemap->nodes[j].pnn == ctdb->pnn && !include_self) {
276                         continue;
277                 }
278                 state = ctdb_control_send(ctdb, nodemap->nodes[j].pnn, 0, opcode, 
279                                           0, data, async_data, NULL, &timeout, NULL);
280                 if (state == NULL) {
281                         DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
282                         talloc_free(async_data);
283                         return -1;
284                 }
285                 
286                 async_add(async_data, state);
287         }
288
289         if (async_wait(ctdb, async_data) != 0) {
290                 DEBUG(0,(__location__ " Failed async control %u\n", (unsigned)opcode));
291                 talloc_free(async_data);
292                 return -1;
293         }
294
295         talloc_free(async_data);
296         return 0;
297 }
298
299
300
301 /*
302   change recovery mode on all nodes
303  */
304 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
305 {
306         TDB_DATA data;
307
308         /* freeze all nodes */
309         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
310                 if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_FREEZE, 
311                                                   nodemap, tdb_null, true) != 0) {
312                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
313                         return -1;
314                 }
315         }
316
317
318         data.dsize = sizeof(uint32_t);
319         data.dptr = (unsigned char *)&rec_mode;
320
321         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_SET_RECMODE, 
322                                           nodemap, data, true) != 0) {
323                 DEBUG(0, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
324                 return -1;
325         }
326
327         if (rec_mode == CTDB_RECOVERY_NORMAL) {
328                 if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_THAW, 
329                                                   nodemap, tdb_null, true) != 0) {
330                         DEBUG(0, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
331                         return -1;
332                 }
333         }
334
335         return 0;
336 }
337
338 /*
339   change recovery master on all node
340  */
341 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
342 {
343         TDB_DATA data;
344
345         data.dsize = sizeof(uint32_t);
346         data.dptr = (unsigned char *)&pnn;
347
348         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_SET_RECMASTER, 
349                                           nodemap, data, true) != 0) {
350                 DEBUG(0, (__location__ " Unable to set recmaster. Recovery failed.\n"));
351                 return -1;
352         }
353
354         return 0;
355 }
356
357
358 /*
359   ensure all other nodes have attached to any databases that we have
360  */
361 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
362                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
363 {
364         int i, j, db, ret;
365         struct ctdb_dbid_map *remote_dbmap;
366
367         /* verify that all other nodes have all our databases */
368         for (j=0; j<nodemap->num; j++) {
369                 /* we dont need to ourself ourselves */
370                 if (nodemap->nodes[j].pnn == pnn) {
371                         continue;
372                 }
373                 /* dont check nodes that are unavailable */
374                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
375                         continue;
376                 }
377
378                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
379                                          mem_ctx, &remote_dbmap);
380                 if (ret != 0) {
381                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
382                         return -1;
383                 }
384
385                 /* step through all local databases */
386                 for (db=0; db<dbmap->num;db++) {
387                         const char *name;
388
389
390                         for (i=0;i<remote_dbmap->num;i++) {
391                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
392                                         break;
393                                 }
394                         }
395                         /* the remote node already have this database */
396                         if (i!=remote_dbmap->num) {
397                                 continue;
398                         }
399                         /* ok so we need to create this database */
400                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
401                                             mem_ctx, &name);
402                         if (ret != 0) {
403                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
404                                 return -1;
405                         }
406                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
407                                            mem_ctx, name, dbmap->dbs[db].persistent);
408                         if (ret != 0) {
409                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
410                                 return -1;
411                         }
412                 }
413         }
414
415         return 0;
416 }
417
418
419 /*
420   ensure we are attached to any databases that anyone else is attached to
421  */
422 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
423                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
424 {
425         int i, j, db, ret;
426         struct ctdb_dbid_map *remote_dbmap;
427
428         /* verify that we have all database any other node has */
429         for (j=0; j<nodemap->num; j++) {
430                 /* we dont need to ourself ourselves */
431                 if (nodemap->nodes[j].pnn == pnn) {
432                         continue;
433                 }
434                 /* dont check nodes that are unavailable */
435                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
436                         continue;
437                 }
438
439                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
440                                          mem_ctx, &remote_dbmap);
441                 if (ret != 0) {
442                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
443                         return -1;
444                 }
445
446                 /* step through all databases on the remote node */
447                 for (db=0; db<remote_dbmap->num;db++) {
448                         const char *name;
449
450                         for (i=0;i<(*dbmap)->num;i++) {
451                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
452                                         break;
453                                 }
454                         }
455                         /* we already have this db locally */
456                         if (i!=(*dbmap)->num) {
457                                 continue;
458                         }
459                         /* ok so we need to create this database and
460                            rebuild dbmap
461                          */
462                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
463                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
464                         if (ret != 0) {
465                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
466                                           nodemap->nodes[j].pnn));
467                                 return -1;
468                         }
469                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
470                                            remote_dbmap->dbs[db].persistent);
471                         if (ret != 0) {
472                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
473                                 return -1;
474                         }
475                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
476                         if (ret != 0) {
477                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
478                                 return -1;
479                         }
480                 }
481         }
482
483         return 0;
484 }
485
486
487 /*
488   pull the remote database contents from one node into the recdb
489  */
490 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
491                                     struct tdb_wrap *recdb, uint32_t dbid)
492 {
493         int ret;
494         TDB_DATA outdata;
495         struct ctdb_control_pulldb_reply *reply;
496         struct ctdb_rec_data *rec;
497         int i;
498         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
499
500         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
501                                CONTROL_TIMEOUT(), &outdata);
502         if (ret != 0) {
503                 DEBUG(0,(__location__ " Unable to copy db from node %u\n", srcnode));
504                 talloc_free(tmp_ctx);
505                 return -1;
506         }
507
508         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
509
510         if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
511                 DEBUG(0,(__location__ " invalid data in pulldb reply\n"));
512                 talloc_free(tmp_ctx);
513                 return -1;
514         }
515         
516         rec = (struct ctdb_rec_data *)&reply->data[0];
517         
518         for (i=0;
519              i<reply->count;
520              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
521                 TDB_DATA key, data;
522                 struct ctdb_ltdb_header *hdr;
523                 TDB_DATA existing;
524                 
525                 key.dptr = &rec->data[0];
526                 key.dsize = rec->keylen;
527                 data.dptr = &rec->data[key.dsize];
528                 data.dsize = rec->datalen;
529                 
530                 hdr = (struct ctdb_ltdb_header *)data.dptr;
531
532                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
533                         DEBUG(0,(__location__ " bad ltdb record\n"));
534                         talloc_free(tmp_ctx);
535                         return -1;
536                 }
537
538                 /* fetch the existing record, if any */
539                 existing = tdb_fetch(recdb->tdb, key);
540                 
541                 if (existing.dptr != NULL) {
542                         struct ctdb_ltdb_header header;
543                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
544                                 DEBUG(0,(__location__ " Bad record size %u from node %u\n", 
545                                          (unsigned)existing.dsize, srcnode));
546                                 free(existing.dptr);
547                                 talloc_free(tmp_ctx);
548                                 return -1;
549                         }
550                         header = *(struct ctdb_ltdb_header *)existing.dptr;
551                         free(existing.dptr);
552                         if (!(header.rsn < hdr->rsn ||
553                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
554                                 continue;
555                         }
556                 }
557                 
558                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
559                         DEBUG(0,(__location__ " Failed to store record\n"));
560                         talloc_free(tmp_ctx);
561                         return -1;                              
562                 }
563         }
564
565         talloc_free(tmp_ctx);
566
567         return 0;
568 }
569
570 /*
571   pull all the remote database contents into the recdb
572  */
573 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
574                                 struct tdb_wrap *recdb, uint32_t dbid)
575 {
576         int j;
577
578         /* pull all records from all other nodes across onto this node
579            (this merges based on rsn)
580         */
581         for (j=0; j<nodemap->num; j++) {
582                 /* dont merge from nodes that are unavailable */
583                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
584                         continue;
585                 }
586                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
587                         DEBUG(0,(__location__ " Failed to pull remote database from node %u\n", 
588                                  nodemap->nodes[j].pnn));
589                         return -1;
590                 }
591         }
592         
593         return 0;
594 }
595
596
597 /*
598   update flags on all active nodes
599  */
600 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
601 {
602         int i;
603         for (i=0;i<nodemap->num;i++) {
604                 struct ctdb_node_flag_change c;
605                 TDB_DATA data;
606
607                 c.pnn = nodemap->nodes[i].pnn;
608                 c.old_flags = nodemap->nodes[i].flags;
609                 c.new_flags = nodemap->nodes[i].flags;
610
611                 data.dptr = (uint8_t *)&c;
612                 data.dsize = sizeof(c);
613
614                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
615                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
616
617         }
618         return 0;
619 }
620
621
622 /*
623   ensure all nodes have the same vnnmap we do
624  */
625 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
626                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
627 {
628         int j, ret;
629
630         /* push the new vnn map out to all the nodes */
631         for (j=0; j<nodemap->num; j++) {
632                 /* dont push to nodes that are unavailable */
633                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
634                         continue;
635                 }
636
637                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
638                 if (ret != 0) {
639                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
640                         return -1;
641                 }
642         }
643
644         return 0;
645 }
646
647
648 /*
649   handler for when the admin bans a node
650 */
651 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
652                         TDB_DATA data, void *private_data)
653 {
654         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
655         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
656         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
657
658         if (data.dsize != sizeof(*b)) {
659                 DEBUG(0,("Bad data in ban_handler\n"));
660                 talloc_free(mem_ctx);
661                 return;
662         }
663
664         if (b->pnn != ctdb->pnn) {
665                 DEBUG(0,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
666                 return;
667         }
668
669         DEBUG(0,("Node %u has been banned for %u seconds\n", 
670                  b->pnn, b->ban_time));
671
672         ctdb_ban_node(rec, b->pnn, b->ban_time);
673         talloc_free(mem_ctx);
674 }
675
676 /*
677   handler for when the admin unbans a node
678 */
679 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
680                           TDB_DATA data, void *private_data)
681 {
682         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
683         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
684         uint32_t pnn;
685
686         if (data.dsize != sizeof(uint32_t)) {
687                 DEBUG(0,("Bad data in unban_handler\n"));
688                 talloc_free(mem_ctx);
689                 return;
690         }
691         pnn = *(uint32_t *)data.dptr;
692
693         if (pnn != ctdb->pnn) {
694                 DEBUG(0,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
695                 return;
696         }
697
698         DEBUG(0,("Node %u has been unbanned.\n", pnn));
699         ctdb_unban_node(rec, pnn);
700         talloc_free(mem_ctx);
701 }
702
703
704
705 /*
706   called when ctdb_wait_timeout should finish
707  */
708 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
709                               struct timeval yt, void *p)
710 {
711         uint32_t *timed_out = (uint32_t *)p;
712         (*timed_out) = 1;
713 }
714
715 /*
716   wait for a given number of seconds
717  */
718 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
719 {
720         uint32_t timed_out = 0;
721         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
722         while (!timed_out) {
723                 event_loop_once(ctdb->ev);
724         }
725 }
726
727 /*
728   called when an election times out (ends)
729  */
730 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
731                                   struct timeval t, void *p)
732 {
733         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
734         rec->election_timeout = NULL;
735 }
736
737
738 /*
739   wait for an election to finish. It finished election_timeout seconds after
740   the last election packet is received
741  */
742 static void ctdb_wait_election(struct ctdb_recoverd *rec)
743 {
744         struct ctdb_context *ctdb = rec->ctdb;
745         while (rec->election_timeout) {
746                 event_loop_once(ctdb->ev);
747         }
748 }
749
750 /*
751   remember the trouble maker
752  */
753 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
754 {
755         struct ctdb_context *ctdb = rec->ctdb;
756
757         if (rec->last_culprit != culprit ||
758             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
759                 DEBUG(0,("New recovery culprit %u\n", culprit));
760                 /* either a new node is the culprit, or we've decided to forgive them */
761                 rec->last_culprit = culprit;
762                 rec->first_recover_time = timeval_current();
763                 rec->culprit_counter = 0;
764         }
765         rec->culprit_counter++;
766 }
767
768 /*
769   Update our local flags from all remote connected nodes. 
770   This is only run when we are or we belive we are the recovery master
771  */
772 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
773 {
774         int j;
775         struct ctdb_context *ctdb = rec->ctdb;
776         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
777
778         /* get the nodemap for all active remote nodes and verify
779            they are the same as for this node
780          */
781         for (j=0; j<nodemap->num; j++) {
782                 struct ctdb_node_map *remote_nodemap=NULL;
783                 int ret;
784
785                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
786                         continue;
787                 }
788                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
789                         continue;
790                 }
791
792                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
793                                            mem_ctx, &remote_nodemap);
794                 if (ret != 0) {
795                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
796                                   nodemap->nodes[j].pnn));
797                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
798                         talloc_free(mem_ctx);
799                         return MONITOR_FAILED;
800                 }
801                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
802                         struct ctdb_node_flag_change c;
803                         TDB_DATA data;
804
805                         /* We should tell our daemon about this so it
806                            updates its flags or else we will log the same 
807                            message again in the next iteration of recovery.
808                            Since we are the recovery master we can just as
809                            well update the flags on all nodes.
810                         */
811                         c.pnn = nodemap->nodes[j].pnn;
812                         c.old_flags = nodemap->nodes[j].flags;
813                         c.new_flags = remote_nodemap->nodes[j].flags;
814
815                         data.dptr = (uint8_t *)&c;
816                         data.dsize = sizeof(c);
817
818                         ctdb_send_message(ctdb, ctdb->pnn,
819                                         CTDB_SRVID_NODE_FLAGS_CHANGED, 
820                                         data);
821
822                         /* Update our local copy of the flags in the recovery
823                            daemon.
824                         */
825                         DEBUG(0,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
826                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
827                                  nodemap->nodes[j].flags));
828                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
829
830                         /* If the BANNED flag has changed for the node
831                            this is a good reason to do a new election.
832                          */
833                         if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
834                                 DEBUG(0,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
835                                  nodemap->nodes[j].pnn, c.new_flags,
836                                  c.old_flags));
837                                 talloc_free(mem_ctx);
838                                 return MONITOR_ELECTION_NEEDED;
839                         }
840
841                 }
842                 talloc_free(remote_nodemap);
843         }
844         talloc_free(mem_ctx);
845         return MONITOR_OK;
846 }
847
848
849 /* Create a new random generation ip. 
850    The generation id can not be the INVALID_GENERATION id
851 */
852 static uint32_t new_generation(void)
853 {
854         uint32_t generation;
855
856         while (1) {
857                 generation = random();
858
859                 if (generation != INVALID_GENERATION) {
860                         break;
861                 }
862         }
863
864         return generation;
865 }
866
867
868 /*
869   create a temporary working database
870  */
871 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
872 {
873         char *name;
874         struct tdb_wrap *recdb;
875
876         /* open up the temporary recovery database */
877         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
878         if (name == NULL) {
879                 return NULL;
880         }
881         unlink(name);
882         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
883                               TDB_NOLOCK, O_RDWR|O_CREAT|O_EXCL, 0600);
884         if (recdb == NULL) {
885                 DEBUG(0,(__location__ " Failed to create temp recovery database '%s'\n", name));
886         }
887
888         talloc_free(name);
889
890         return recdb;
891 }
892
893
894 /* 
895    a traverse function for pulling all relevent records from recdb
896  */
897 struct recdb_data {
898         struct ctdb_context *ctdb;
899         struct ctdb_control_pulldb_reply *recdata;
900         uint32_t len;
901         bool failed;
902 };
903
904 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
905 {
906         struct recdb_data *params = (struct recdb_data *)p;
907         struct ctdb_rec_data *rec;
908         struct ctdb_ltdb_header *hdr;
909
910         /* skip empty records */
911         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
912                 return 0;
913         }
914
915         /* update the dmaster field to point to us */
916         hdr = (struct ctdb_ltdb_header *)data.dptr;
917         hdr->dmaster = params->ctdb->pnn;
918
919         /* add the record to the blob ready to send to the nodes */
920         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
921         if (rec == NULL) {
922                 params->failed = true;
923                 return -1;
924         }
925         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
926         if (params->recdata == NULL) {
927                 DEBUG(0,(__location__ " Failed to expand recdata to %u (%u records)\n", 
928                          rec->length + params->len, params->recdata->count));
929                 params->failed = true;
930                 return -1;
931         }
932         params->recdata->count++;
933         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
934         params->len += rec->length;
935         talloc_free(rec);
936
937         return 0;
938 }
939
940 /*
941   push the recdb database out to all nodes
942  */
943 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
944                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
945 {
946         struct recdb_data params;
947         struct ctdb_control_pulldb_reply *recdata;
948         TDB_DATA outdata;
949
950         recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
951         CTDB_NO_MEMORY(ctdb, recdata);
952
953         recdata->db_id = dbid;
954
955         params.ctdb = ctdb;
956         params.recdata = recdata;
957         params.len = offsetof(struct ctdb_control_pulldb_reply, data);
958         params.failed = false;
959
960         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
961                 DEBUG(0,(__location__ " Failed to traverse recdb database\n"));
962                 talloc_free(params.recdata);
963                 return -1;
964         }
965
966         if (params.failed) {
967                 DEBUG(0,(__location__ " Failed to traverse recdb database\n"));
968                 talloc_free(params.recdata);
969                 return -1;              
970         }
971
972         recdata = params.recdata;
973
974         outdata.dptr = (void *)recdata;
975         outdata.dsize = params.len;
976
977         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_PUSH_DB, nodemap, outdata, true) != 0) {
978                 DEBUG(0,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
979                 talloc_free(recdata);
980                 return -1;
981         }
982
983         DEBUG(0, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
984                   dbid, recdata->count));
985
986         talloc_free(recdata);
987
988         return 0;
989 }
990
991
992 /*
993   go through a full recovery on one database 
994  */
995 static int recover_database(struct ctdb_recoverd *rec, 
996                             TALLOC_CTX *mem_ctx,
997                             uint32_t dbid,
998                             uint32_t pnn, 
999                             struct ctdb_node_map *nodemap,
1000                             uint32_t transaction_id)
1001 {
1002         struct tdb_wrap *recdb;
1003         int ret;
1004         struct ctdb_context *ctdb = rec->ctdb;
1005         TDB_DATA data;
1006         struct ctdb_control_wipe_database w;
1007
1008         recdb = create_recdb(ctdb, mem_ctx);
1009         if (recdb == NULL) {
1010                 return -1;
1011         }
1012
1013         /* pull all remote databases onto the recdb */
1014         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1015         if (ret != 0) {
1016                 DEBUG(0, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1017                 return -1;
1018         }
1019
1020         DEBUG(0, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1021
1022         /* wipe all the remote databases. This is safe as we are in a transaction */
1023         w.db_id = dbid;
1024         w.transaction_id = transaction_id;
1025
1026         data.dptr = (void *)&w;
1027         data.dsize = sizeof(w);
1028
1029         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_WIPE_DATABASE, 
1030                                           nodemap, data, true) != 0) {
1031                 DEBUG(0, (__location__ " Unable to wipe database. Recovery failed.\n"));
1032                 return -1;
1033         }
1034         
1035         /* push out the correct database. This sets the dmaster and skips 
1036            the empty records */
1037         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1038         if (ret != 0) {
1039                 talloc_free(recdb);
1040                 return -1;
1041         }
1042
1043         /* all done with this database */
1044         talloc_free(recdb);
1045
1046         return 0;
1047 }
1048
1049                 
1050 /*
1051   we are the recmaster, and recovery is needed - start a recovery run
1052  */
1053 static int do_recovery(struct ctdb_recoverd *rec, 
1054                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
1055                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1056                        uint32_t culprit)
1057 {
1058         struct ctdb_context *ctdb = rec->ctdb;
1059         int i, j, ret;
1060         uint32_t generation;
1061         struct ctdb_dbid_map *dbmap;
1062         TDB_DATA data;
1063
1064         DEBUG(0, (__location__ " Starting do_recovery\n"));
1065
1066         /* if recovery fails, force it again */
1067         rec->need_recovery = true;
1068
1069         ctdb_set_culprit(rec, culprit);
1070
1071         if (rec->culprit_counter > 2*nodemap->num) {
1072                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1073                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1074                          ctdb->tunable.recovery_ban_period));
1075                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1076         }
1077
1078         if (!ctdb_recovery_lock(ctdb, true)) {
1079                 ctdb_set_culprit(rec, pnn);
1080                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
1081                 return -1;
1082         }
1083
1084         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1085
1086         /* get a list of all databases */
1087         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1088         if (ret != 0) {
1089                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
1090                 return -1;
1091         }
1092
1093         /* we do the db creation before we set the recovery mode, so the freeze happens
1094            on all databases we will be dealing with. */
1095
1096         /* verify that we have all the databases any other node has */
1097         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1098         if (ret != 0) {
1099                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
1100                 return -1;
1101         }
1102
1103         /* verify that all other nodes have all our databases */
1104         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1105         if (ret != 0) {
1106                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
1107                 return -1;
1108         }
1109
1110         DEBUG(0, (__location__ " Recovery - created remote databases\n"));
1111
1112         /* set recovery mode to active on all nodes */
1113         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1114         if (ret!=0) {
1115                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1116                 return -1;
1117         }
1118
1119         /* pick a new generation number */
1120         generation = new_generation();
1121
1122         /* change the vnnmap on this node to use the new generation 
1123            number but not on any other nodes.
1124            this guarantees that if we abort the recovery prematurely
1125            for some reason (a node stops responding?)
1126            that we can just return immediately and we will reenter
1127            recovery shortly again.
1128            I.e. we deliberately leave the cluster with an inconsistent
1129            generation id to allow us to abort recovery at any stage and
1130            just restart it from scratch.
1131          */
1132         vnnmap->generation = generation;
1133         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1134         if (ret != 0) {
1135                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1136                 return -1;
1137         }
1138
1139         data.dptr = (void *)&generation;
1140         data.dsize = sizeof(uint32_t);
1141
1142         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_TRANSACTION_START, 
1143                                           nodemap, data, true) != 0) {
1144                 DEBUG(0, (__location__ " Unable to start transactions. Recovery failed.\n"));
1145                 return -1;
1146         }
1147
1148         DEBUG(0,(__location__ " started transactions on all nodes\n"));
1149
1150         for (i=0;i<dbmap->num;i++) {
1151                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1152                         DEBUG(0, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1153                         return -1;
1154                 }
1155         }
1156
1157         DEBUG(0, (__location__ " Recovery - starting database commits\n"));
1158
1159         /* commit all the changes */
1160         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT, 
1161                                           nodemap, data, true) != 0) {
1162                 DEBUG(0, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1163                 return -1;
1164         }
1165
1166         DEBUG(0, (__location__ " Recovery - committed databases\n"));
1167         
1168
1169         /* build a new vnn map with all the currently active and
1170            unbanned nodes */
1171         generation = new_generation();
1172         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1173         CTDB_NO_MEMORY(ctdb, vnnmap);
1174         vnnmap->generation = generation;
1175         vnnmap->size = num_active;
1176         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1177         for (i=j=0;i<nodemap->num;i++) {
1178                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1179                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
1180                 }
1181         }
1182
1183         /* update to the new vnnmap on all nodes */
1184         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1185         if (ret != 0) {
1186                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
1187                 return -1;
1188         }
1189
1190         DEBUG(0, (__location__ " Recovery - updated vnnmap\n"));
1191
1192         /* update recmaster to point to us for all nodes */
1193         ret = set_recovery_master(ctdb, nodemap, pnn);
1194         if (ret!=0) {
1195                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
1196                 return -1;
1197         }
1198
1199         DEBUG(0, (__location__ " Recovery - updated recmaster\n"));
1200
1201         /*
1202           update all nodes to have the same flags that we have
1203          */
1204         ret = update_flags_on_all_nodes(ctdb, nodemap);
1205         if (ret != 0) {
1206                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
1207                 return -1;
1208         }
1209         
1210         DEBUG(0, (__location__ " Recovery - updated flags\n"));
1211
1212         /*
1213           if enabled, tell nodes to takeover their public IPs
1214          */
1215         if (ctdb->vnn) {
1216                 rec->need_takeover_run = false;
1217                 ret = ctdb_takeover_run(ctdb, nodemap);
1218                 if (ret != 0) {
1219                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1220                         return -1;
1221                 }
1222                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
1223         }
1224
1225         /* disable recovery mode */
1226         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1227         if (ret!=0) {
1228                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1229                 return -1;
1230         }
1231
1232         /* send a message to all clients telling them that the cluster 
1233            has been reconfigured */
1234         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1235
1236         DEBUG(0, (__location__ " Recovery complete\n"));
1237
1238         rec->need_recovery = false;
1239
1240         /* We just finished a recovery successfully. 
1241            We now wait for rerecovery_timeout before we allow 
1242            another recovery to take place.
1243         */
1244         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1245         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1246         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1247
1248         return 0;
1249 }
1250
1251
1252 /*
1253   elections are won by first checking the number of connected nodes, then
1254   the priority time, then the pnn
1255  */
1256 struct election_message {
1257         uint32_t num_connected;
1258         struct timeval priority_time;
1259         uint32_t pnn;
1260         uint32_t node_flags;
1261 };
1262
1263 /*
1264   form this nodes election data
1265  */
1266 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1267 {
1268         int ret, i;
1269         struct ctdb_node_map *nodemap;
1270         struct ctdb_context *ctdb = rec->ctdb;
1271
1272         ZERO_STRUCTP(em);
1273
1274         em->pnn = rec->ctdb->pnn;
1275         em->priority_time = rec->priority_time;
1276         em->node_flags = rec->node_flags;
1277
1278         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1279         if (ret != 0) {
1280                 DEBUG(0,(__location__ " unable to get election data\n"));
1281                 return;
1282         }
1283
1284         for (i=0;i<nodemap->num;i++) {
1285                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1286                         em->num_connected++;
1287                 }
1288         }
1289         talloc_free(nodemap);
1290 }
1291
1292 /*
1293   see if the given election data wins
1294  */
1295 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1296 {
1297         struct election_message myem;
1298         int cmp = 0;
1299
1300         ctdb_election_data(rec, &myem);
1301
1302         /* we cant win if we are banned */
1303         if (rec->node_flags & NODE_FLAGS_BANNED) {
1304                 return false;
1305         }       
1306
1307         /* we will automatically win if the other node is banned */
1308         if (em->node_flags & NODE_FLAGS_BANNED) {
1309                 return true;
1310         }
1311
1312         /* try to use the most connected node */
1313         if (cmp == 0) {
1314                 cmp = (int)myem.num_connected - (int)em->num_connected;
1315         }
1316
1317         /* then the longest running node */
1318         if (cmp == 0) {
1319                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1320         }
1321
1322         if (cmp == 0) {
1323                 cmp = (int)myem.pnn - (int)em->pnn;
1324         }
1325
1326         return cmp > 0;
1327 }
1328
1329 /*
1330   send out an election request
1331  */
1332 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1333 {
1334         int ret;
1335         TDB_DATA election_data;
1336         struct election_message emsg;
1337         uint64_t srvid;
1338         struct ctdb_context *ctdb = rec->ctdb;
1339
1340         srvid = CTDB_SRVID_RECOVERY;
1341
1342         ctdb_election_data(rec, &emsg);
1343
1344         election_data.dsize = sizeof(struct election_message);
1345         election_data.dptr  = (unsigned char *)&emsg;
1346
1347
1348         /* first we assume we will win the election and set 
1349            recoverymaster to be ourself on the current node
1350          */
1351         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1352         if (ret != 0) {
1353                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1354                 return -1;
1355         }
1356
1357
1358         /* send an election message to all active nodes */
1359         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1360
1361         return 0;
1362 }
1363
1364 /*
1365   this function will unban all nodes in the cluster
1366 */
1367 static void unban_all_nodes(struct ctdb_context *ctdb)
1368 {
1369         int ret, i;
1370         struct ctdb_node_map *nodemap;
1371         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1372         
1373         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1374         if (ret != 0) {
1375                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1376                 return;
1377         }
1378
1379         for (i=0;i<nodemap->num;i++) {
1380                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1381                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1382                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1383                 }
1384         }
1385
1386         talloc_free(tmp_ctx);
1387 }
1388
1389
1390 /*
1391   we think we are winning the election - send a broadcast election request
1392  */
1393 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1394 {
1395         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1396         int ret;
1397
1398         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1399         if (ret != 0) {
1400                 DEBUG(0,("Failed to send election request!\n"));
1401         }
1402
1403         talloc_free(rec->send_election_te);
1404         rec->send_election_te = NULL;
1405 }
1406
1407 /*
1408   handler for recovery master elections
1409 */
1410 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1411                              TDB_DATA data, void *private_data)
1412 {
1413         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1414         int ret;
1415         struct election_message *em = (struct election_message *)data.dptr;
1416         TALLOC_CTX *mem_ctx;
1417
1418         /* we got an election packet - update the timeout for the election */
1419         talloc_free(rec->election_timeout);
1420         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1421                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1422                                                 ctdb_election_timeout, rec);
1423
1424         mem_ctx = talloc_new(ctdb);
1425
1426         /* someone called an election. check their election data
1427            and if we disagree and we would rather be the elected node, 
1428            send a new election message to all other nodes
1429          */
1430         if (ctdb_election_win(rec, em)) {
1431                 if (!rec->send_election_te) {
1432                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1433                                                                 timeval_current_ofs(0, 500000),
1434                                                                 election_send_request, rec);
1435                 }
1436                 talloc_free(mem_ctx);
1437                 /*unban_all_nodes(ctdb);*/
1438                 return;
1439         }
1440         
1441         /* we didn't win */
1442         talloc_free(rec->send_election_te);
1443         rec->send_election_te = NULL;
1444
1445         /* release the recmaster lock */
1446         if (em->pnn != ctdb->pnn &&
1447             ctdb->recovery_lock_fd != -1) {
1448                 close(ctdb->recovery_lock_fd);
1449                 ctdb->recovery_lock_fd = -1;
1450                 unban_all_nodes(ctdb);
1451         }
1452
1453         /* ok, let that guy become recmaster then */
1454         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1455         if (ret != 0) {
1456                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1457                 talloc_free(mem_ctx);
1458                 return;
1459         }
1460
1461         /* release any bans */
1462         rec->last_culprit = (uint32_t)-1;
1463         talloc_free(rec->banned_nodes);
1464         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1465         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1466
1467         talloc_free(mem_ctx);
1468         return;
1469 }
1470
1471
1472 /*
1473   force the start of the election process
1474  */
1475 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1476                            struct ctdb_node_map *nodemap)
1477 {
1478         int ret;
1479         struct ctdb_context *ctdb = rec->ctdb;
1480
1481         /* set all nodes to recovery mode to stop all internode traffic */
1482         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1483         if (ret!=0) {
1484                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1485                 return;
1486         }
1487
1488         talloc_free(rec->election_timeout);
1489         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1490                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1491                                                 ctdb_election_timeout, rec);
1492
1493         ret = send_election_request(rec, pnn);
1494         if (ret!=0) {
1495                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1496                 return;
1497         }
1498
1499         /* wait for a few seconds to collect all responses */
1500         ctdb_wait_election(rec);
1501 }
1502
1503
1504
1505 /*
1506   handler for when a node changes its flags
1507 */
1508 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1509                             TDB_DATA data, void *private_data)
1510 {
1511         int ret;
1512         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1513         struct ctdb_node_map *nodemap=NULL;
1514         TALLOC_CTX *tmp_ctx;
1515         uint32_t changed_flags;
1516         int i;
1517         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1518
1519         if (data.dsize != sizeof(*c)) {
1520                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1521                 return;
1522         }
1523
1524         tmp_ctx = talloc_new(ctdb);
1525         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1526
1527         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1528         if (ret != 0) {
1529                 DEBUG(0,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1530                 talloc_free(tmp_ctx);
1531                 return;         
1532         }
1533
1534
1535         for (i=0;i<nodemap->num;i++) {
1536                 if (nodemap->nodes[i].pnn == c->pnn) break;
1537         }
1538
1539         if (i == nodemap->num) {
1540                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1541                 talloc_free(tmp_ctx);
1542                 return;
1543         }
1544
1545         changed_flags = c->old_flags ^ c->new_flags;
1546
1547         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1548            This flag is handled locally based on whether the local node
1549            can communicate with the node or not.
1550         */
1551         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1552         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1553                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1554         }
1555
1556         if (nodemap->nodes[i].flags != c->new_flags) {
1557                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1558         }
1559
1560         nodemap->nodes[i].flags = c->new_flags;
1561
1562         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1563                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1564
1565         if (ret == 0) {
1566                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1567                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1568         }
1569         
1570         if (ret == 0 &&
1571             ctdb->recovery_master == ctdb->pnn &&
1572             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1573             ctdb->vnn) {
1574                 /* Only do the takeover run if the perm disabled or unhealthy
1575                    flags changed since these will cause an ip failover but not
1576                    a recovery.
1577                    If the node became disconnected or banned this will also
1578                    lead to an ip address failover but that is handled 
1579                    during recovery
1580                 */
1581                 if (changed_flags & NODE_FLAGS_DISABLED) {
1582                         rec->need_takeover_run = true;
1583                 }
1584         }
1585
1586         talloc_free(tmp_ctx);
1587 }
1588
1589
1590
1591 struct verify_recmode_normal_data {
1592         uint32_t count;
1593         enum monitor_result status;
1594 };
1595
1596 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1597 {
1598         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1599
1600
1601         /* one more node has responded with recmode data*/
1602         rmdata->count--;
1603
1604         /* if we failed to get the recmode, then return an error and let
1605            the main loop try again.
1606         */
1607         if (state->state != CTDB_CONTROL_DONE) {
1608                 if (rmdata->status == MONITOR_OK) {
1609                         rmdata->status = MONITOR_FAILED;
1610                 }
1611                 return;
1612         }
1613
1614         /* if we got a response, then the recmode will be stored in the
1615            status field
1616         */
1617         if (state->status != CTDB_RECOVERY_NORMAL) {
1618                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1619                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1620         }
1621
1622         return;
1623 }
1624
1625
1626 /* verify that all nodes are in normal recovery mode */
1627 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1628 {
1629         struct verify_recmode_normal_data *rmdata;
1630         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1631         struct ctdb_client_control_state *state;
1632         enum monitor_result status;
1633         int j;
1634         
1635         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1636         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1637         rmdata->count  = 0;
1638         rmdata->status = MONITOR_OK;
1639
1640         /* loop over all active nodes and send an async getrecmode call to 
1641            them*/
1642         for (j=0; j<nodemap->num; j++) {
1643                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1644                         continue;
1645                 }
1646                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1647                                         CONTROL_TIMEOUT(), 
1648                                         nodemap->nodes[j].pnn);
1649                 if (state == NULL) {
1650                         /* we failed to send the control, treat this as 
1651                            an error and try again next iteration
1652                         */                      
1653                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1654                         talloc_free(mem_ctx);
1655                         return MONITOR_FAILED;
1656                 }
1657
1658                 /* set up the callback functions */
1659                 state->async.fn = verify_recmode_normal_callback;
1660                 state->async.private_data = rmdata;
1661
1662                 /* one more control to wait for to complete */
1663                 rmdata->count++;
1664         }
1665
1666
1667         /* now wait for up to the maximum number of seconds allowed
1668            or until all nodes we expect a response from has replied
1669         */
1670         while (rmdata->count > 0) {
1671                 event_loop_once(ctdb->ev);
1672         }
1673
1674         status = rmdata->status;
1675         talloc_free(mem_ctx);
1676         return status;
1677 }
1678
1679
1680 struct verify_recmaster_data {
1681         uint32_t count;
1682         uint32_t pnn;
1683         enum monitor_result status;
1684 };
1685
1686 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1687 {
1688         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1689
1690
1691         /* one more node has responded with recmaster data*/
1692         rmdata->count--;
1693
1694         /* if we failed to get the recmaster, then return an error and let
1695            the main loop try again.
1696         */
1697         if (state->state != CTDB_CONTROL_DONE) {
1698                 if (rmdata->status == MONITOR_OK) {
1699                         rmdata->status = MONITOR_FAILED;
1700                 }
1701                 return;
1702         }
1703
1704         /* if we got a response, then the recmaster will be stored in the
1705            status field
1706         */
1707         if (state->status != rmdata->pnn) {
1708                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1709                 rmdata->status = MONITOR_ELECTION_NEEDED;
1710         }
1711
1712         return;
1713 }
1714
1715
1716 /* verify that all nodes agree that we are the recmaster */
1717 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1718 {
1719         struct verify_recmaster_data *rmdata;
1720         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1721         struct ctdb_client_control_state *state;
1722         enum monitor_result status;
1723         int j;
1724         
1725         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1726         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1727         rmdata->count  = 0;
1728         rmdata->pnn    = pnn;
1729         rmdata->status = MONITOR_OK;
1730
1731         /* loop over all active nodes and send an async getrecmaster call to 
1732            them*/
1733         for (j=0; j<nodemap->num; j++) {
1734                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1735                         continue;
1736                 }
1737                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1738                                         CONTROL_TIMEOUT(),
1739                                         nodemap->nodes[j].pnn);
1740                 if (state == NULL) {
1741                         /* we failed to send the control, treat this as 
1742                            an error and try again next iteration
1743                         */                      
1744                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1745                         talloc_free(mem_ctx);
1746                         return MONITOR_FAILED;
1747                 }
1748
1749                 /* set up the callback functions */
1750                 state->async.fn = verify_recmaster_callback;
1751                 state->async.private_data = rmdata;
1752
1753                 /* one more control to wait for to complete */
1754                 rmdata->count++;
1755         }
1756
1757
1758         /* now wait for up to the maximum number of seconds allowed
1759            or until all nodes we expect a response from has replied
1760         */
1761         while (rmdata->count > 0) {
1762                 event_loop_once(ctdb->ev);
1763         }
1764
1765         status = rmdata->status;
1766         talloc_free(mem_ctx);
1767         return status;
1768 }
1769
1770
1771 /*
1772   the main monitoring loop
1773  */
1774 static void monitor_cluster(struct ctdb_context *ctdb)
1775 {
1776         uint32_t pnn, num_active, recmaster;
1777         TALLOC_CTX *mem_ctx=NULL;
1778         struct ctdb_node_map *nodemap=NULL;
1779         struct ctdb_node_map *remote_nodemap=NULL;
1780         struct ctdb_vnn_map *vnnmap=NULL;
1781         struct ctdb_vnn_map *remote_vnnmap=NULL;
1782         int i, j, ret;
1783         struct ctdb_recoverd *rec;
1784         struct ctdb_all_public_ips *ips;
1785         char c;
1786
1787         DEBUG(0,("monitor_cluster starting\n"));
1788
1789         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1790         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1791
1792         rec->ctdb = ctdb;
1793         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1794         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1795
1796         rec->priority_time = timeval_current();
1797
1798         /* register a message port for recovery elections */
1799         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1800
1801         /* and one for when nodes are disabled/enabled */
1802         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1803
1804         /* and one for when nodes are banned */
1805         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1806
1807         /* and one for when nodes are unbanned */
1808         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1809         
1810 again:
1811         if (mem_ctx) {
1812                 talloc_free(mem_ctx);
1813                 mem_ctx = NULL;
1814         }
1815         mem_ctx = talloc_new(ctdb);
1816         if (!mem_ctx) {
1817                 DEBUG(0,("Failed to create temporary context\n"));
1818                 exit(-1);
1819         }
1820
1821         /* we only check for recovery once every second */
1822         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1823
1824         if (rec->election_timeout) {
1825                 /* an election is in progress */
1826                 goto again;
1827         }
1828
1829
1830         /* We must check if we need to ban a node here but we want to do this
1831            as early as possible so we dont wait until we have pulled the node
1832            map from the local node. thats why we have the hardcoded value 20
1833         */
1834         if (rec->culprit_counter > 20) {
1835                 DEBUG(0,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
1836                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1837                          ctdb->tunable.recovery_ban_period));
1838                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
1839         }
1840
1841         /* get relevant tunables */
1842         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1843         if (ret != 0) {
1844                 DEBUG(0,("Failed to get tunables - retrying\n"));
1845                 goto again;
1846         }
1847
1848         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1849         if (pnn == (uint32_t)-1) {
1850                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1851                 goto again;
1852         }
1853
1854         /* get the vnnmap */
1855         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1856         if (ret != 0) {
1857                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1858                 goto again;
1859         }
1860
1861
1862         /* get number of nodes */
1863         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1864         if (ret != 0) {
1865                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1866                 goto again;
1867         }
1868
1869         /* check which node is the recovery master */
1870         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1871         if (ret != 0) {
1872                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1873                 goto again;
1874         }
1875
1876         if (recmaster == (uint32_t)-1) {
1877                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1878                 force_election(rec, mem_ctx, pnn, nodemap);
1879                 goto again;
1880         }
1881         
1882         /* check that we (recovery daemon) and the local ctdb daemon
1883            agrees on whether we are banned or not
1884         */
1885         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
1886                 if (rec->banned_nodes[pnn] == NULL) {
1887                         if (recmaster == pnn) {
1888                                 DEBUG(0,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
1889
1890                                 ctdb_unban_node(rec, pnn);
1891                         } else {
1892                                 DEBUG(0,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
1893                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1894                                 ctdb_set_culprit(rec, pnn);
1895                         }
1896                         goto again;
1897                 }
1898         } else {
1899                 if (rec->banned_nodes[pnn] != NULL) {
1900                         if (recmaster == pnn) {
1901                                 DEBUG(0,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
1902
1903                                 ctdb_unban_node(rec, pnn);
1904                         } else {
1905                                 DEBUG(0,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
1906
1907                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1908                                 ctdb_set_culprit(rec, pnn);
1909                         }
1910                         goto again;
1911                 }
1912         }
1913
1914         /* remember our own node flags */
1915         rec->node_flags = nodemap->nodes[pnn].flags;
1916
1917         /* count how many active nodes there are */
1918         num_active = 0;
1919         for (i=0; i<nodemap->num; i++) {
1920                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1921                         num_active++;
1922                 }
1923         }
1924
1925
1926         /* verify that the recmaster node is still active */
1927         for (j=0; j<nodemap->num; j++) {
1928                 if (nodemap->nodes[j].pnn==recmaster) {
1929                         break;
1930                 }
1931         }
1932
1933         if (j == nodemap->num) {
1934                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1935                 force_election(rec, mem_ctx, pnn, nodemap);
1936                 goto again;
1937         }
1938
1939         /* if recovery master is disconnected we must elect a new recmaster */
1940         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1941                 DEBUG(0, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
1942                 force_election(rec, mem_ctx, pnn, nodemap);
1943                 goto again;
1944         }
1945
1946         /* grap the nodemap from the recovery master to check if it is banned */
1947         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1948                                    mem_ctx, &remote_nodemap);
1949         if (ret != 0) {
1950                 DEBUG(0, (__location__ " Unable to get nodemap from recovery master %u\n", 
1951                           nodemap->nodes[j].pnn));
1952                 goto again;
1953         }
1954
1955
1956         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1957                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1958                 force_election(rec, mem_ctx, pnn, nodemap);
1959                 goto again;
1960         }
1961
1962         /* verify that the public ip address allocation is consistent */
1963         if (ctdb->vnn != NULL) {
1964                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
1965                 if (ret != 0) {
1966                         DEBUG(0, ("Unable to get public ips from node %u\n", i));
1967                         goto again;
1968                 }
1969                 for (j=0; j<ips->num; j++) {
1970                         /* verify that we have the ip addresses we should have
1971                            and we dont have ones we shouldnt have.
1972                            if we find an inconsistency we set recmode to
1973                            active on the local node and wait for the recmaster
1974                            to do a full blown recovery
1975                         */
1976                         if (ips->ips[j].pnn == pnn) {
1977                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
1978                                         DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1979                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1980                                         if (ret != 0) {
1981                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1982                                                 goto again;
1983                                         }
1984                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1985                                         if (ret != 0) {
1986                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1987                                                 goto again;
1988                                         }
1989                                 }
1990                         } else {
1991                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
1992                                         DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1993                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1994                                         if (ret != 0) {
1995                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1996                                                 goto again;
1997                                         }
1998                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1999                                         if (ret != 0) {
2000                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2001                                                 goto again;
2002                                         }
2003                                 }
2004                         }
2005                 }
2006         }
2007
2008         /* if we are not the recmaster then we do not need to check
2009            if recovery is needed
2010          */
2011         if (pnn != recmaster) {
2012                 goto again;
2013         }
2014
2015
2016         /* ensure our local copies of flags are right */
2017         ret = update_local_flags(rec, nodemap);
2018         if (ret == MONITOR_ELECTION_NEEDED) {
2019                 DEBUG(0,("update_local_flags() called for a re-election.\n"));
2020                 force_election(rec, mem_ctx, pnn, nodemap);
2021                 goto again;
2022         }
2023         if (ret != MONITOR_OK) {
2024                 DEBUG(0,("Unable to update local flags\n"));
2025                 goto again;
2026         }
2027
2028         /* update the list of public ips that a node can handle for
2029            all connected nodes
2030         */
2031         for (j=0; j<nodemap->num; j++) {
2032                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2033                         continue;
2034                 }
2035                 /* release any existing data */
2036                 if (ctdb->nodes[j]->public_ips) {
2037                         talloc_free(ctdb->nodes[j]->public_ips);
2038                         ctdb->nodes[j]->public_ips = NULL;
2039                 }
2040                 /* grab a new shiny list of public ips from the node */
2041                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2042                         ctdb->nodes[j]->pnn, 
2043                         ctdb->nodes,
2044                         &ctdb->nodes[j]->public_ips)) {
2045                         DEBUG(0,("Failed to read public ips from node : %u\n", 
2046                                 ctdb->nodes[j]->pnn));
2047                         goto again;
2048                 }
2049         }
2050
2051
2052         /* verify that all active nodes agree that we are the recmaster */
2053         switch (verify_recmaster(ctdb, nodemap, pnn)) {
2054         case MONITOR_RECOVERY_NEEDED:
2055                 /* can not happen */
2056                 goto again;
2057         case MONITOR_ELECTION_NEEDED:
2058                 force_election(rec, mem_ctx, pnn, nodemap);
2059                 goto again;
2060         case MONITOR_OK:
2061                 break;
2062         case MONITOR_FAILED:
2063                 goto again;
2064         }
2065
2066
2067         if (rec->need_recovery) {
2068                 /* a previous recovery didn't finish */
2069                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2070                 goto again;             
2071         }
2072
2073         /* verify that all active nodes are in normal mode 
2074            and not in recovery mode 
2075          */
2076         switch (verify_recmode(ctdb, nodemap)) {
2077         case MONITOR_RECOVERY_NEEDED:
2078                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2079                 goto again;
2080         case MONITOR_FAILED:
2081                 goto again;
2082         case MONITOR_ELECTION_NEEDED:
2083                 /* can not happen */
2084         case MONITOR_OK:
2085                 break;
2086         }
2087
2088
2089         /* we should have the reclock - check its not stale */
2090         if (ctdb->recovery_lock_fd == -1) {
2091                 DEBUG(0,("recovery master doesn't have the recovery lock\n"));
2092                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2093                 goto again;
2094         }
2095
2096         if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
2097                 DEBUG(0,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2098                 close(ctdb->recovery_lock_fd);
2099                 ctdb->recovery_lock_fd = -1;
2100                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2101                 goto again;
2102         }
2103
2104         /* get the nodemap for all active remote nodes and verify
2105            they are the same as for this node
2106          */
2107         for (j=0; j<nodemap->num; j++) {
2108                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2109                         continue;
2110                 }
2111                 if (nodemap->nodes[j].pnn == pnn) {
2112                         continue;
2113                 }
2114
2115                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2116                                            mem_ctx, &remote_nodemap);
2117                 if (ret != 0) {
2118                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
2119                                   nodemap->nodes[j].pnn));
2120                         goto again;
2121                 }
2122
2123                 /* if the nodes disagree on how many nodes there are
2124                    then this is a good reason to try recovery
2125                  */
2126                 if (remote_nodemap->num != nodemap->num) {
2127                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2128                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
2129                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2130                         goto again;
2131                 }
2132
2133                 /* if the nodes disagree on which nodes exist and are
2134                    active, then that is also a good reason to do recovery
2135                  */
2136                 for (i=0;i<nodemap->num;i++) {
2137                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2138                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2139                                           nodemap->nodes[j].pnn, i, 
2140                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2141                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2142                                             vnnmap, nodemap->nodes[j].pnn);
2143                                 goto again;
2144                         }
2145                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2146                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2147                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2148                                           nodemap->nodes[j].pnn, i,
2149                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2150                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2151                                             vnnmap, nodemap->nodes[j].pnn);
2152                                 goto again;
2153                         }
2154                 }
2155
2156         }
2157
2158
2159         /* there better be the same number of lmasters in the vnn map
2160            as there are active nodes or we will have to do a recovery
2161          */
2162         if (vnnmap->size != num_active) {
2163                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2164                           vnnmap->size, num_active));
2165                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2166                 goto again;
2167         }
2168
2169         /* verify that all active nodes in the nodemap also exist in 
2170            the vnnmap.
2171          */
2172         for (j=0; j<nodemap->num; j++) {
2173                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2174                         continue;
2175                 }
2176                 if (nodemap->nodes[j].pnn == pnn) {
2177                         continue;
2178                 }
2179
2180                 for (i=0; i<vnnmap->size; i++) {
2181                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2182                                 break;
2183                         }
2184                 }
2185                 if (i == vnnmap->size) {
2186                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2187                                   nodemap->nodes[j].pnn));
2188                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2189                         goto again;
2190                 }
2191         }
2192
2193         
2194         /* verify that all other nodes have the same vnnmap
2195            and are from the same generation
2196          */
2197         for (j=0; j<nodemap->num; j++) {
2198                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2199                         continue;
2200                 }
2201                 if (nodemap->nodes[j].pnn == pnn) {
2202                         continue;
2203                 }
2204
2205                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2206                                           mem_ctx, &remote_vnnmap);
2207                 if (ret != 0) {
2208                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
2209                                   nodemap->nodes[j].pnn));
2210                         goto again;
2211                 }
2212
2213                 /* verify the vnnmap generation is the same */
2214                 if (vnnmap->generation != remote_vnnmap->generation) {
2215                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2216                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2217                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2218                         goto again;
2219                 }
2220
2221                 /* verify the vnnmap size is the same */
2222                 if (vnnmap->size != remote_vnnmap->size) {
2223                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2224                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2225                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2226                         goto again;
2227                 }
2228
2229                 /* verify the vnnmap is the same */
2230                 for (i=0;i<vnnmap->size;i++) {
2231                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2232                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
2233                                           nodemap->nodes[j].pnn));
2234                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2235                                             vnnmap, nodemap->nodes[j].pnn);
2236                                 goto again;
2237                         }
2238                 }
2239         }
2240
2241         /* we might need to change who has what IP assigned */
2242         if (rec->need_takeover_run) {
2243                 rec->need_takeover_run = false;
2244                 ret = ctdb_takeover_run(ctdb, nodemap);
2245                 if (ret != 0) {
2246                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2247                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2248                                     vnnmap, ctdb->pnn);
2249                 }
2250         }
2251
2252         goto again;
2253
2254 }
2255
2256 /*
2257   event handler for when the main ctdbd dies
2258  */
2259 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2260                                  uint16_t flags, void *private_data)
2261 {
2262         DEBUG(0,("recovery daemon parent died - exiting\n"));
2263         _exit(1);
2264 }
2265
2266 /*
2267   startup the recovery daemon as a child of the main ctdb daemon
2268  */
2269 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2270 {
2271         int ret;
2272         int fd[2];
2273
2274         if (pipe(fd) != 0) {
2275                 return -1;
2276         }
2277
2278         ctdb->recoverd_pid = fork();
2279         if (ctdb->recoverd_pid == -1) {
2280                 return -1;
2281         }
2282         
2283         if (ctdb->recoverd_pid != 0) {
2284                 close(fd[0]);
2285                 return 0;
2286         }
2287
2288         close(fd[1]);
2289
2290         /* shutdown the transport */
2291         ctdb->methods->shutdown(ctdb);
2292
2293         /* get a new event context */
2294         talloc_free(ctdb->ev);
2295         ctdb->ev = event_context_init(ctdb);
2296
2297         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2298                      ctdb_recoverd_parent, &fd[0]);     
2299
2300         close(ctdb->daemon.sd);
2301         ctdb->daemon.sd = -1;
2302
2303         srandom(getpid() ^ time(NULL));
2304
2305         /* initialise ctdb */
2306         ret = ctdb_socket_connect(ctdb);
2307         if (ret != 0) {
2308                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
2309                 exit(1);
2310         }
2311
2312         monitor_cluster(ctdb);
2313
2314         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
2315         return -1;
2316 }
2317
2318 /*
2319   shutdown the recovery daemon
2320  */
2321 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2322 {
2323         if (ctdb->recoverd_pid == 0) {
2324                 return;
2325         }
2326
2327         DEBUG(0,("Shutting down recovery daemon\n"));
2328         kill(ctdb->recoverd_pid, SIGTERM);
2329 }