fixed realloc bug
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         int rec_file_fd;
45         uint32_t recmaster;
46         uint32_t num_active;
47         uint32_t num_connected;
48         struct ctdb_node_map *nodemap;
49         uint32_t last_culprit;
50         uint32_t culprit_counter;
51         struct timeval first_recover_time;
52         struct ban_state **banned_nodes;
53         struct timeval priority_time;
54         bool need_takeover_run;
55         bool need_recovery;
56         uint32_t node_flags;
57         struct timed_event *send_election_te;
58         struct timed_event *election_timeout;
59         struct vacuum_info *vacuum_info;
60 };
61
62 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
63 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
64
65
66 /*
67   unban a node
68  */
69 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
70 {
71         struct ctdb_context *ctdb = rec->ctdb;
72
73         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
74
75         if (!ctdb_validate_pnn(ctdb, pnn)) {
76                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
77                 return;
78         }
79
80         /* If we are unbanning a different node then just pass the ban info on */
81         if (pnn != ctdb->pnn) {
82                 TDB_DATA data;
83                 int ret;
84                 
85                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
86
87                 data.dptr = (uint8_t *)&pnn;
88                 data.dsize = sizeof(uint32_t);
89
90                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
91                 if (ret != 0) {
92                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
93                         return;
94                 }
95
96                 return;
97         }
98
99         /* make sure we remember we are no longer banned in case 
100            there is an election */
101         rec->node_flags &= ~NODE_FLAGS_BANNED;
102
103         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
104         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
105
106         if (rec->banned_nodes[pnn] == NULL) {
107                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
108                 return;
109         }
110
111         talloc_free(rec->banned_nodes[pnn]);
112         rec->banned_nodes[pnn] = NULL;
113 }
114
115
116 /*
117   called when a ban has timed out
118  */
119 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
120 {
121         struct ban_state *state = talloc_get_type(p, struct ban_state);
122         struct ctdb_recoverd *rec = state->rec;
123         uint32_t pnn = state->banned_node;
124
125         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
126         ctdb_unban_node(rec, pnn);
127 }
128
129 /*
130   ban a node for a period of time
131  */
132 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
133 {
134         struct ctdb_context *ctdb = rec->ctdb;
135
136         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
137
138         if (!ctdb_validate_pnn(ctdb, pnn)) {
139                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
140                 return;
141         }
142
143         if (0 == ctdb->tunable.enable_bans) {
144                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
145                 return;
146         }
147
148         /* If we are banning a different node then just pass the ban info on */
149         if (pnn != ctdb->pnn) {
150                 struct ctdb_ban_info b;
151                 TDB_DATA data;
152                 int ret;
153                 
154                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
155
156                 b.pnn = pnn;
157                 b.ban_time = ban_time;
158
159                 data.dptr = (uint8_t *)&b;
160                 data.dsize = sizeof(b);
161
162                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
163                 if (ret != 0) {
164                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
165                         return;
166                 }
167
168                 return;
169         }
170
171         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
172         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
173
174         /* banning ourselves - lower our election priority */
175         rec->priority_time = timeval_current();
176
177         /* make sure we remember we are banned in case there is an 
178            election */
179         rec->node_flags |= NODE_FLAGS_BANNED;
180
181         if (rec->banned_nodes[pnn] != NULL) {
182                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
183                 talloc_free(rec->banned_nodes[pnn]);
184                 rec->banned_nodes[pnn] = NULL;
185         }
186
187         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
188         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
189
190         rec->banned_nodes[pnn]->rec = rec;
191         rec->banned_nodes[pnn]->banned_node = pnn;
192
193         if (ban_time != 0) {
194                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
195                                 timeval_current_ofs(ban_time, 0),
196                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
197         }
198 }
199
200 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
201
202
203 /*
204   run the "recovered" eventscript on all nodes
205  */
206 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
207 {
208         TALLOC_CTX *tmp_ctx;
209
210         tmp_ctx = talloc_new(ctdb);
211         CTDB_NO_MEMORY(ctdb, tmp_ctx);
212
213         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
214                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
215                         CONTROL_TIMEOUT(), false, tdb_null, NULL) != 0) {
216                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event. Recovery failed.\n"));
217                 talloc_free(tmp_ctx);
218                 return -1;
219         }
220
221         talloc_free(tmp_ctx);
222         return 0;
223 }
224
225 /*
226   run the "startrecovery" eventscript on all nodes
227  */
228 static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
229 {
230         TALLOC_CTX *tmp_ctx;
231
232         tmp_ctx = talloc_new(ctdb);
233         CTDB_NO_MEMORY(ctdb, tmp_ctx);
234
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
236                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
237                         CONTROL_TIMEOUT(), false, tdb_null, NULL) != 0) {
238                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
239                 talloc_free(tmp_ctx);
240                 return -1;
241         }
242
243         talloc_free(tmp_ctx);
244         return 0;
245 }
246
247 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata)
248 {
249         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
250                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %d %p\n", outdata.dsize, outdata.dptr));
251                 return;
252         }
253         ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
254 }
255
256 /*
257   update the node capabilities for all connected nodes
258  */
259 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
260 {
261         uint32_t *nodes;
262         TALLOC_CTX *tmp_ctx;
263
264         tmp_ctx = talloc_new(ctdb);
265         CTDB_NO_MEMORY(ctdb, tmp_ctx);
266
267         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
268
269         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
270                                         nodes, CONTROL_TIMEOUT(),
271                                         false, tdb_null, async_getcap_callback) != 0) {
272                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
273                 talloc_free(tmp_ctx);
274                 return -1;
275         }
276
277         talloc_free(tmp_ctx);
278         return 0;
279 }
280
281 /*
282   change recovery mode on all nodes
283  */
284 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
285 {
286         TDB_DATA data;
287         uint32_t *nodes;
288         TALLOC_CTX *tmp_ctx;
289
290         tmp_ctx = talloc_new(ctdb);
291         CTDB_NO_MEMORY(ctdb, tmp_ctx);
292
293         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
294
295         /* freeze all nodes */
296         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
297                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
298                                                 nodes, CONTROL_TIMEOUT(),
299                                                 false, tdb_null, NULL) != 0) {
300                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
301                         talloc_free(tmp_ctx);
302                         return -1;
303                 }
304         }
305
306
307         data.dsize = sizeof(uint32_t);
308         data.dptr = (unsigned char *)&rec_mode;
309
310         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
311                                         nodes, CONTROL_TIMEOUT(),
312                                         false, data, NULL) != 0) {
313                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
314                 talloc_free(tmp_ctx);
315                 return -1;
316         }
317
318         if (rec_mode == CTDB_RECOVERY_NORMAL) {
319                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
320                                                 nodes, CONTROL_TIMEOUT(),
321                                                 false, tdb_null, NULL) != 0) {
322                         DEBUG(DEBUG_ERR, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
323                         talloc_free(tmp_ctx);
324                         return -1;
325                 }
326         }
327
328         talloc_free(tmp_ctx);
329         return 0;
330 }
331
332 /*
333   change recovery master on all node
334  */
335 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
336 {
337         TDB_DATA data;
338         TALLOC_CTX *tmp_ctx;
339
340         tmp_ctx = talloc_new(ctdb);
341         CTDB_NO_MEMORY(ctdb, tmp_ctx);
342
343         data.dsize = sizeof(uint32_t);
344         data.dptr = (unsigned char *)&pnn;
345
346         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
347                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
348                         CONTROL_TIMEOUT(), false, data, NULL) != 0) {
349                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
350                 talloc_free(tmp_ctx);
351                 return -1;
352         }
353
354         talloc_free(tmp_ctx);
355         return 0;
356 }
357
358
359 /*
360   ensure all other nodes have attached to any databases that we have
361  */
362 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
363                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
364 {
365         int i, j, db, ret;
366         struct ctdb_dbid_map *remote_dbmap;
367
368         /* verify that all other nodes have all our databases */
369         for (j=0; j<nodemap->num; j++) {
370                 /* we dont need to ourself ourselves */
371                 if (nodemap->nodes[j].pnn == pnn) {
372                         continue;
373                 }
374                 /* dont check nodes that are unavailable */
375                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
376                         continue;
377                 }
378
379                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
380                                          mem_ctx, &remote_dbmap);
381                 if (ret != 0) {
382                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
383                         return -1;
384                 }
385
386                 /* step through all local databases */
387                 for (db=0; db<dbmap->num;db++) {
388                         const char *name;
389
390
391                         for (i=0;i<remote_dbmap->num;i++) {
392                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
393                                         break;
394                                 }
395                         }
396                         /* the remote node already have this database */
397                         if (i!=remote_dbmap->num) {
398                                 continue;
399                         }
400                         /* ok so we need to create this database */
401                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
402                                             mem_ctx, &name);
403                         if (ret != 0) {
404                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
405                                 return -1;
406                         }
407                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
408                                            mem_ctx, name, dbmap->dbs[db].persistent);
409                         if (ret != 0) {
410                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
411                                 return -1;
412                         }
413                 }
414         }
415
416         return 0;
417 }
418
419
420 /*
421   ensure we are attached to any databases that anyone else is attached to
422  */
423 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
424                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
425 {
426         int i, j, db, ret;
427         struct ctdb_dbid_map *remote_dbmap;
428
429         /* verify that we have all database any other node has */
430         for (j=0; j<nodemap->num; j++) {
431                 /* we dont need to ourself ourselves */
432                 if (nodemap->nodes[j].pnn == pnn) {
433                         continue;
434                 }
435                 /* dont check nodes that are unavailable */
436                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
437                         continue;
438                 }
439
440                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                          mem_ctx, &remote_dbmap);
442                 if (ret != 0) {
443                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
444                         return -1;
445                 }
446
447                 /* step through all databases on the remote node */
448                 for (db=0; db<remote_dbmap->num;db++) {
449                         const char *name;
450
451                         for (i=0;i<(*dbmap)->num;i++) {
452                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
453                                         break;
454                                 }
455                         }
456                         /* we already have this db locally */
457                         if (i!=(*dbmap)->num) {
458                                 continue;
459                         }
460                         /* ok so we need to create this database and
461                            rebuild dbmap
462                          */
463                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
464                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
465                         if (ret != 0) {
466                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
467                                           nodemap->nodes[j].pnn));
468                                 return -1;
469                         }
470                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
471                                            remote_dbmap->dbs[db].persistent);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
474                                 return -1;
475                         }
476                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
477                         if (ret != 0) {
478                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
479                                 return -1;
480                         }
481                 }
482         }
483
484         return 0;
485 }
486
487
488 /*
489   pull the remote database contents from one node into the recdb
490  */
491 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
492                                     struct tdb_wrap *recdb, uint32_t dbid)
493 {
494         int ret;
495         TDB_DATA outdata;
496         struct ctdb_control_pulldb_reply *reply;
497         struct ctdb_rec_data *rec;
498         int i;
499         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
500
501         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
502                                CONTROL_TIMEOUT(), &outdata);
503         if (ret != 0) {
504                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
505                 talloc_free(tmp_ctx);
506                 return -1;
507         }
508
509         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
510
511         if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
512                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
513                 talloc_free(tmp_ctx);
514                 return -1;
515         }
516         
517         rec = (struct ctdb_rec_data *)&reply->data[0];
518         
519         for (i=0;
520              i<reply->count;
521              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
522                 TDB_DATA key, data;
523                 struct ctdb_ltdb_header *hdr;
524                 TDB_DATA existing;
525                 
526                 key.dptr = &rec->data[0];
527                 key.dsize = rec->keylen;
528                 data.dptr = &rec->data[key.dsize];
529                 data.dsize = rec->datalen;
530                 
531                 hdr = (struct ctdb_ltdb_header *)data.dptr;
532
533                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
534                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
535                         talloc_free(tmp_ctx);
536                         return -1;
537                 }
538
539                 /* fetch the existing record, if any */
540                 existing = tdb_fetch(recdb->tdb, key);
541                 
542                 if (existing.dptr != NULL) {
543                         struct ctdb_ltdb_header header;
544                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
545                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
546                                          (unsigned)existing.dsize, srcnode));
547                                 free(existing.dptr);
548                                 talloc_free(tmp_ctx);
549                                 return -1;
550                         }
551                         header = *(struct ctdb_ltdb_header *)existing.dptr;
552                         free(existing.dptr);
553                         if (!(header.rsn < hdr->rsn ||
554                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
555                                 continue;
556                         }
557                 }
558                 
559                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
560                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
561                         talloc_free(tmp_ctx);
562                         return -1;                              
563                 }
564         }
565
566         talloc_free(tmp_ctx);
567
568         return 0;
569 }
570
571 /*
572   pull all the remote database contents into the recdb
573  */
574 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
575                                 struct tdb_wrap *recdb, uint32_t dbid)
576 {
577         int j;
578
579         /* pull all records from all other nodes across onto this node
580            (this merges based on rsn)
581         */
582         for (j=0; j<nodemap->num; j++) {
583                 /* dont merge from nodes that are unavailable */
584                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
585                         continue;
586                 }
587                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
588                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
589                                  nodemap->nodes[j].pnn));
590                         return -1;
591                 }
592         }
593         
594         return 0;
595 }
596
597
598 /*
599   update flags on all active nodes
600  */
601 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
602 {
603         int i;
604         for (i=0;i<nodemap->num;i++) {
605                 struct ctdb_node_flag_change c;
606                 TDB_DATA data;
607
608                 c.pnn = nodemap->nodes[i].pnn;
609                 c.old_flags = nodemap->nodes[i].flags;
610                 c.new_flags = nodemap->nodes[i].flags;
611
612                 data.dptr = (uint8_t *)&c;
613                 data.dsize = sizeof(c);
614
615                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
616                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
617
618         }
619         return 0;
620 }
621
622
623 /*
624   ensure all nodes have the same vnnmap we do
625  */
626 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
627                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
628 {
629         int j, ret;
630
631         /* push the new vnn map out to all the nodes */
632         for (j=0; j<nodemap->num; j++) {
633                 /* dont push to nodes that are unavailable */
634                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
635                         continue;
636                 }
637
638                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
639                 if (ret != 0) {
640                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
641                         return -1;
642                 }
643         }
644
645         return 0;
646 }
647
648
649 /*
650   handler for when the admin bans a node
651 */
652 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
653                         TDB_DATA data, void *private_data)
654 {
655         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
656         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
657         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
658
659         if (data.dsize != sizeof(*b)) {
660                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
661                 talloc_free(mem_ctx);
662                 return;
663         }
664
665         if (b->pnn != ctdb->pnn) {
666                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
667                 return;
668         }
669
670         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
671                  b->pnn, b->ban_time));
672
673         ctdb_ban_node(rec, b->pnn, b->ban_time);
674         talloc_free(mem_ctx);
675 }
676
677 /*
678   handler for when the admin unbans a node
679 */
680 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
681                           TDB_DATA data, void *private_data)
682 {
683         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
684         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
685         uint32_t pnn;
686
687         if (data.dsize != sizeof(uint32_t)) {
688                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
689                 talloc_free(mem_ctx);
690                 return;
691         }
692         pnn = *(uint32_t *)data.dptr;
693
694         if (pnn != ctdb->pnn) {
695                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
696                 return;
697         }
698
699         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
700         ctdb_unban_node(rec, pnn);
701         talloc_free(mem_ctx);
702 }
703
704
705 struct vacuum_info {
706         struct vacuum_info *next, *prev;
707         struct ctdb_recoverd *rec;
708         uint32_t srcnode;
709         struct ctdb_db_context *ctdb_db;
710         struct ctdb_control_pulldb_reply *recs;
711         struct ctdb_rec_data *r;
712 };
713
714 static void vacuum_fetch_next(struct vacuum_info *v);
715
716 /*
717   called when a vacuum fetch has completed - just free it and do the next one
718  */
719 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
720 {
721         struct vacuum_info *v = talloc_get_type(state->async.private, struct vacuum_info);
722         talloc_free(state);
723         vacuum_fetch_next(v);
724 }
725
726
727 /*
728   process the next element from the vacuum list
729 */
730 static void vacuum_fetch_next(struct vacuum_info *v)
731 {
732         struct ctdb_call call;
733         struct ctdb_rec_data *r;
734
735         while (v->recs->count) {
736                 struct ctdb_client_call_state *state;
737                 TDB_DATA data;
738                 struct ctdb_ltdb_header *hdr;
739
740                 ZERO_STRUCT(call);
741                 call.call_id = CTDB_NULL_FUNC;
742                 call.flags = CTDB_IMMEDIATE_MIGRATION;
743
744                 r = v->r;
745                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
746                 v->recs->count--;
747
748                 call.key.dptr = &r->data[0];
749                 call.key.dsize = r->keylen;
750
751                 /* ensure we don't block this daemon - just skip a record if we can't get
752                    the chainlock */
753                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
754                         continue;
755                 }
756
757                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
758                 if (data.dptr == NULL) {
759                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
760                         continue;
761                 }
762
763                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
764                         free(data.dptr);
765                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
766                         continue;
767                 }
768                 
769                 hdr = (struct ctdb_ltdb_header *)data.dptr;
770                 if (hdr->dmaster == v->rec->ctdb->pnn) {
771                         /* its already local */
772                         free(data.dptr);
773                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
774                         continue;
775                 }
776
777                 free(data.dptr);
778
779                 state = ctdb_call_send(v->ctdb_db, &call);
780                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
781                 if (state == NULL) {
782                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
783                         talloc_free(v);
784                         return;
785                 }
786                 state->async.fn = vacuum_fetch_callback;
787                 state->async.private = v;
788                 return;
789         }
790
791         talloc_free(v);
792 }
793
794
795 /*
796   destroy a vacuum info structure
797  */
798 static int vacuum_info_destructor(struct vacuum_info *v)
799 {
800         DLIST_REMOVE(v->rec->vacuum_info, v);
801         return 0;
802 }
803
804
805 /*
806   handler for vacuum fetch
807 */
808 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
809                                  TDB_DATA data, void *private_data)
810 {
811         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
812         struct ctdb_control_pulldb_reply *recs;
813         int ret, i;
814         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
815         const char *name;
816         struct ctdb_dbid_map *dbmap=NULL;
817         bool persistent = false;
818         struct ctdb_db_context *ctdb_db;
819         struct ctdb_rec_data *r;
820         uint32_t srcnode;
821         struct vacuum_info *v;
822
823         recs = (struct ctdb_control_pulldb_reply *)data.dptr;
824         r = (struct ctdb_rec_data *)&recs->data[0];
825
826         if (recs->count == 0) {
827                 return;
828         }
829
830         srcnode = r->reqid;
831
832         for (v=rec->vacuum_info;v;v=v->next) {
833                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
834                         /* we're already working on records from this node */
835                         return;
836                 }
837         }
838
839         /* work out if the database is persistent */
840         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
841         if (ret != 0) {
842                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
843                 talloc_free(tmp_ctx);
844                 return;
845         }
846
847         for (i=0;i<dbmap->num;i++) {
848                 if (dbmap->dbs[i].dbid == recs->db_id) {
849                         persistent = dbmap->dbs[i].persistent;
850                         break;
851                 }
852         }
853         if (i == dbmap->num) {
854                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
855                 talloc_free(tmp_ctx);
856                 return;         
857         }
858
859         /* find the name of this database */
860         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
861                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
862                 talloc_free(tmp_ctx);
863                 return;
864         }
865
866         /* attach to it */
867         ctdb_db = ctdb_attach(ctdb, name, persistent);
868         if (ctdb_db == NULL) {
869                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
870                 talloc_free(tmp_ctx);
871                 return;
872         }
873
874         v = talloc_zero(rec, struct vacuum_info);
875         if (v == NULL) {
876                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
877                 return;
878         }
879
880         v->rec = rec;
881         v->srcnode = srcnode;
882         v->ctdb_db = ctdb_db;
883         v->recs = talloc_memdup(v, recs, data.dsize);
884         if (v->recs == NULL) {
885                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
886                 talloc_free(v);
887                 return;         
888         }
889         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
890
891         DLIST_ADD(rec->vacuum_info, v);
892
893         talloc_set_destructor(v, vacuum_info_destructor);
894
895         vacuum_fetch_next(v);
896 }
897
898
899 /*
900   called when ctdb_wait_timeout should finish
901  */
902 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
903                               struct timeval yt, void *p)
904 {
905         uint32_t *timed_out = (uint32_t *)p;
906         (*timed_out) = 1;
907 }
908
909 /*
910   wait for a given number of seconds
911  */
912 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
913 {
914         uint32_t timed_out = 0;
915         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
916         while (!timed_out) {
917                 event_loop_once(ctdb->ev);
918         }
919 }
920
921 /*
922   called when an election times out (ends)
923  */
924 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
925                                   struct timeval t, void *p)
926 {
927         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
928         rec->election_timeout = NULL;
929 }
930
931
932 /*
933   wait for an election to finish. It finished election_timeout seconds after
934   the last election packet is received
935  */
936 static void ctdb_wait_election(struct ctdb_recoverd *rec)
937 {
938         struct ctdb_context *ctdb = rec->ctdb;
939         while (rec->election_timeout) {
940                 event_loop_once(ctdb->ev);
941         }
942 }
943
944 /*
945   remember the trouble maker
946  */
947 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
948 {
949         struct ctdb_context *ctdb = rec->ctdb;
950
951         if (rec->last_culprit != culprit ||
952             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
953                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
954                 /* either a new node is the culprit, or we've decided to forgive them */
955                 rec->last_culprit = culprit;
956                 rec->first_recover_time = timeval_current();
957                 rec->culprit_counter = 0;
958         }
959         rec->culprit_counter++;
960 }
961
962 /*
963   Update our local flags from all remote connected nodes. 
964   This is only run when we are or we belive we are the recovery master
965  */
966 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
967 {
968         int j;
969         struct ctdb_context *ctdb = rec->ctdb;
970         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
971
972         /* get the nodemap for all active remote nodes and verify
973            they are the same as for this node
974          */
975         for (j=0; j<nodemap->num; j++) {
976                 struct ctdb_node_map *remote_nodemap=NULL;
977                 int ret;
978
979                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
980                         continue;
981                 }
982                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
983                         continue;
984                 }
985
986                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
987                                            mem_ctx, &remote_nodemap);
988                 if (ret != 0) {
989                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
990                                   nodemap->nodes[j].pnn));
991                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
992                         talloc_free(mem_ctx);
993                         return MONITOR_FAILED;
994                 }
995                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
996                         struct ctdb_node_flag_change c;
997                         TDB_DATA data;
998
999                         /* We should tell our daemon about this so it
1000                            updates its flags or else we will log the same 
1001                            message again in the next iteration of recovery.
1002                            Since we are the recovery master we can just as
1003                            well update the flags on all nodes.
1004                         */
1005                         c.pnn = nodemap->nodes[j].pnn;
1006                         c.old_flags = nodemap->nodes[j].flags;
1007                         c.new_flags = remote_nodemap->nodes[j].flags;
1008
1009                         data.dptr = (uint8_t *)&c;
1010                         data.dsize = sizeof(c);
1011
1012                         ctdb_send_message(ctdb, ctdb->pnn,
1013                                         CTDB_SRVID_NODE_FLAGS_CHANGED, 
1014                                         data);
1015
1016                         /* Update our local copy of the flags in the recovery
1017                            daemon.
1018                         */
1019                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1020                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1021                                  nodemap->nodes[j].flags));
1022                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1023
1024                         /* If the BANNED flag has changed for the node
1025                            this is a good reason to do a new election.
1026                          */
1027                         if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
1028                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1029                                  nodemap->nodes[j].pnn, c.new_flags,
1030                                  c.old_flags));
1031                                 talloc_free(mem_ctx);
1032                                 return MONITOR_ELECTION_NEEDED;
1033                         }
1034
1035                 }
1036                 talloc_free(remote_nodemap);
1037         }
1038         talloc_free(mem_ctx);
1039         return MONITOR_OK;
1040 }
1041
1042
1043 /* Create a new random generation ip. 
1044    The generation id can not be the INVALID_GENERATION id
1045 */
1046 static uint32_t new_generation(void)
1047 {
1048         uint32_t generation;
1049
1050         while (1) {
1051                 generation = random();
1052
1053                 if (generation != INVALID_GENERATION) {
1054                         break;
1055                 }
1056         }
1057
1058         return generation;
1059 }
1060
1061
1062 /*
1063   create a temporary working database
1064  */
1065 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1066 {
1067         char *name;
1068         struct tdb_wrap *recdb;
1069
1070         /* open up the temporary recovery database */
1071         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1072         if (name == NULL) {
1073                 return NULL;
1074         }
1075         unlink(name);
1076         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1077                               TDB_NOLOCK, O_RDWR|O_CREAT|O_EXCL, 0600);
1078         if (recdb == NULL) {
1079                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1080         }
1081
1082         talloc_free(name);
1083
1084         return recdb;
1085 }
1086
1087
1088 /* 
1089    a traverse function for pulling all relevent records from recdb
1090  */
1091 struct recdb_data {
1092         struct ctdb_context *ctdb;
1093         struct ctdb_control_pulldb_reply *recdata;
1094         uint32_t len;
1095         bool failed;
1096 };
1097
1098 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1099 {
1100         struct recdb_data *params = (struct recdb_data *)p;
1101         struct ctdb_rec_data *rec;
1102         struct ctdb_ltdb_header *hdr;
1103
1104         /* skip empty records */
1105         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1106                 return 0;
1107         }
1108
1109         /* update the dmaster field to point to us */
1110         hdr = (struct ctdb_ltdb_header *)data.dptr;
1111         hdr->dmaster = params->ctdb->pnn;
1112
1113         /* add the record to the blob ready to send to the nodes */
1114         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1115         if (rec == NULL) {
1116                 params->failed = true;
1117                 return -1;
1118         }
1119         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1120         if (params->recdata == NULL) {
1121                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1122                          rec->length + params->len, params->recdata->count));
1123                 params->failed = true;
1124                 return -1;
1125         }
1126         params->recdata->count++;
1127         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1128         params->len += rec->length;
1129         talloc_free(rec);
1130
1131         return 0;
1132 }
1133
1134 /*
1135   push the recdb database out to all nodes
1136  */
1137 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1138                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1139 {
1140         struct recdb_data params;
1141         struct ctdb_control_pulldb_reply *recdata;
1142         TDB_DATA outdata;
1143         TALLOC_CTX *tmp_ctx;
1144
1145         tmp_ctx = talloc_new(ctdb);
1146         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1147
1148         recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
1149         CTDB_NO_MEMORY(ctdb, recdata);
1150
1151         recdata->db_id = dbid;
1152
1153         params.ctdb = ctdb;
1154         params.recdata = recdata;
1155         params.len = offsetof(struct ctdb_control_pulldb_reply, data);
1156         params.failed = false;
1157
1158         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1159                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1160                 talloc_free(params.recdata);
1161                 talloc_free(tmp_ctx);
1162                 return -1;
1163         }
1164
1165         if (params.failed) {
1166                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1167                 talloc_free(params.recdata);
1168                 talloc_free(tmp_ctx);
1169                 return -1;              
1170         }
1171
1172         recdata = params.recdata;
1173
1174         outdata.dptr = (void *)recdata;
1175         outdata.dsize = params.len;
1176
1177         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1178                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
1179                         CONTROL_TIMEOUT(), false, outdata, NULL) != 0) {
1180                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1181                 talloc_free(recdata);
1182                 talloc_free(tmp_ctx);
1183                 return -1;
1184         }
1185
1186         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1187                   dbid, recdata->count));
1188
1189         talloc_free(recdata);
1190         talloc_free(tmp_ctx);
1191
1192         return 0;
1193 }
1194
1195
1196 /*
1197   go through a full recovery on one database 
1198  */
1199 static int recover_database(struct ctdb_recoverd *rec, 
1200                             TALLOC_CTX *mem_ctx,
1201                             uint32_t dbid,
1202                             uint32_t pnn, 
1203                             struct ctdb_node_map *nodemap,
1204                             uint32_t transaction_id)
1205 {
1206         struct tdb_wrap *recdb;
1207         int ret;
1208         struct ctdb_context *ctdb = rec->ctdb;
1209         TDB_DATA data;
1210         struct ctdb_control_wipe_database w;
1211
1212         recdb = create_recdb(ctdb, mem_ctx);
1213         if (recdb == NULL) {
1214                 return -1;
1215         }
1216
1217         /* pull all remote databases onto the recdb */
1218         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1219         if (ret != 0) {
1220                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1221                 return -1;
1222         }
1223
1224         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1225
1226         /* wipe all the remote databases. This is safe as we are in a transaction */
1227         w.db_id = dbid;
1228         w.transaction_id = transaction_id;
1229
1230         data.dptr = (void *)&w;
1231         data.dsize = sizeof(w);
1232
1233         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1234                         list_of_active_nodes(ctdb, nodemap, recdb, true),
1235                         CONTROL_TIMEOUT(), false, data, NULL) != 0) {
1236                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1237                 talloc_free(recdb);
1238                 return -1;
1239         }
1240         
1241         /* push out the correct database. This sets the dmaster and skips 
1242            the empty records */
1243         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1244         if (ret != 0) {
1245                 talloc_free(recdb);
1246                 return -1;
1247         }
1248
1249         /* all done with this database */
1250         talloc_free(recdb);
1251
1252         return 0;
1253 }
1254
1255                 
1256 /*
1257   we are the recmaster, and recovery is needed - start a recovery run
1258  */
1259 static int do_recovery(struct ctdb_recoverd *rec, 
1260                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1261                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1262                        uint32_t culprit)
1263 {
1264         struct ctdb_context *ctdb = rec->ctdb;
1265         int i, j, ret;
1266         uint32_t generation;
1267         struct ctdb_dbid_map *dbmap;
1268         TDB_DATA data;
1269
1270         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1271
1272         /* if recovery fails, force it again */
1273         rec->need_recovery = true;
1274
1275         ctdb_set_culprit(rec, culprit);
1276
1277         if (rec->culprit_counter > 2*nodemap->num) {
1278                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1279                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1280                          ctdb->tunable.recovery_ban_period));
1281                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1282         }
1283
1284         if (!ctdb_recovery_lock(ctdb, true)) {
1285                 ctdb_set_culprit(rec, pnn);
1286                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1287                 return -1;
1288         }
1289
1290         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1291
1292         /* get a list of all databases */
1293         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1294         if (ret != 0) {
1295                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1296                 return -1;
1297         }
1298
1299         /* we do the db creation before we set the recovery mode, so the freeze happens
1300            on all databases we will be dealing with. */
1301
1302         /* verify that we have all the databases any other node has */
1303         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1304         if (ret != 0) {
1305                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1306                 return -1;
1307         }
1308
1309         /* verify that all other nodes have all our databases */
1310         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1311         if (ret != 0) {
1312                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1313                 return -1;
1314         }
1315
1316         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1317
1318
1319         /* set recovery mode to active on all nodes */
1320         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1321         if (ret!=0) {
1322                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1323                 return -1;
1324         }
1325
1326         /* execute the "startrecovery" event script on all nodes */
1327         ret = run_startrecovery_eventscript(ctdb, nodemap);
1328         if (ret!=0) {
1329                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1330                 return -1;
1331         }
1332
1333         /* pick a new generation number */
1334         generation = new_generation();
1335
1336         /* change the vnnmap on this node to use the new generation 
1337            number but not on any other nodes.
1338            this guarantees that if we abort the recovery prematurely
1339            for some reason (a node stops responding?)
1340            that we can just return immediately and we will reenter
1341            recovery shortly again.
1342            I.e. we deliberately leave the cluster with an inconsistent
1343            generation id to allow us to abort recovery at any stage and
1344            just restart it from scratch.
1345          */
1346         vnnmap->generation = generation;
1347         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1348         if (ret != 0) {
1349                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1350                 return -1;
1351         }
1352
1353         data.dptr = (void *)&generation;
1354         data.dsize = sizeof(uint32_t);
1355
1356         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1357                         list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
1358                         CONTROL_TIMEOUT(), false, data, NULL) != 0) {
1359                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1360                 return -1;
1361         }
1362
1363         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1364
1365         for (i=0;i<dbmap->num;i++) {
1366                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1367                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1368                         return -1;
1369                 }
1370         }
1371
1372         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1373
1374         /* commit all the changes */
1375         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1376                         list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
1377                         CONTROL_TIMEOUT(), false, data, NULL) != 0) {
1378                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1379                 return -1;
1380         }
1381
1382         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1383         
1384
1385         /* update the capabilities for all nodes */
1386         ret = update_capabilities(ctdb, nodemap);
1387         if (ret!=0) {
1388                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1389                 return -1;
1390         }
1391
1392         /* build a new vnn map with all the currently active and
1393            unbanned nodes */
1394         generation = new_generation();
1395         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1396         CTDB_NO_MEMORY(ctdb, vnnmap);
1397         vnnmap->generation = generation;
1398         vnnmap->size = 0;
1399         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1400         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1401         for (i=j=0;i<nodemap->num;i++) {
1402                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1403                         continue;
1404                 }
1405                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1406                         /* this node can not be an lmaster */
1407                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1408                         continue;
1409                 }
1410
1411                 vnnmap->size++;
1412                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1413                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1414                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1415
1416         }
1417         if (vnnmap->size == 0) {
1418                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1419                 vnnmap->size++;
1420                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1421                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1422                 vnnmap->map[0] = pnn;
1423         }       
1424
1425         /* update to the new vnnmap on all nodes */
1426         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1427         if (ret != 0) {
1428                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1429                 return -1;
1430         }
1431
1432         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1433
1434         /* update recmaster to point to us for all nodes */
1435         ret = set_recovery_master(ctdb, nodemap, pnn);
1436         if (ret!=0) {
1437                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1438                 return -1;
1439         }
1440
1441         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1442
1443         /*
1444           update all nodes to have the same flags that we have
1445          */
1446         ret = update_flags_on_all_nodes(ctdb, nodemap);
1447         if (ret != 0) {
1448                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
1449                 return -1;
1450         }
1451         
1452         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1453
1454         /*
1455           if enabled, tell nodes to takeover their public IPs
1456          */
1457         if (ctdb->vnn) {
1458                 rec->need_takeover_run = false;
1459                 ret = ctdb_takeover_run(ctdb, nodemap);
1460                 if (ret != 0) {
1461                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1462                         return -1;
1463                 }
1464                 DEBUG(DEBUG_INFO, (__location__ " Recovery - done takeover\n"));
1465         }
1466
1467         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1468
1469         /* execute the "recovered" event script on all nodes */
1470         ret = run_recovered_eventscript(ctdb, nodemap);
1471         if (ret!=0) {
1472                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
1473                 return -1;
1474         }
1475
1476         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1477
1478         /* disable recovery mode */
1479         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1480         if (ret!=0) {
1481                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1482                 return -1;
1483         }
1484
1485         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1486
1487         /* send a message to all clients telling them that the cluster 
1488            has been reconfigured */
1489         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1490
1491         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1492
1493         rec->need_recovery = false;
1494
1495         /* We just finished a recovery successfully. 
1496            We now wait for rerecovery_timeout before we allow 
1497            another recovery to take place.
1498         */
1499         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1500         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1501         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1502
1503         return 0;
1504 }
1505
1506
1507 /*
1508   elections are won by first checking the number of connected nodes, then
1509   the priority time, then the pnn
1510  */
1511 struct election_message {
1512         uint32_t num_connected;
1513         struct timeval priority_time;
1514         uint32_t pnn;
1515         uint32_t node_flags;
1516 };
1517
1518 /*
1519   form this nodes election data
1520  */
1521 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1522 {
1523         int ret, i;
1524         struct ctdb_node_map *nodemap;
1525         struct ctdb_context *ctdb = rec->ctdb;
1526
1527         ZERO_STRUCTP(em);
1528
1529         em->pnn = rec->ctdb->pnn;
1530         em->priority_time = rec->priority_time;
1531         em->node_flags = rec->node_flags;
1532
1533         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1534         if (ret != 0) {
1535                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1536                 return;
1537         }
1538
1539         for (i=0;i<nodemap->num;i++) {
1540                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1541                         em->num_connected++;
1542                 }
1543         }
1544
1545         /* we shouldnt try to win this election if we cant be a recmaster */
1546         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1547                 em->num_connected = 0;
1548                 em->priority_time = timeval_current();
1549         }
1550
1551         talloc_free(nodemap);
1552 }
1553
1554 /*
1555   see if the given election data wins
1556  */
1557 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1558 {
1559         struct election_message myem;
1560         int cmp = 0;
1561
1562         ctdb_election_data(rec, &myem);
1563
1564         /* we cant win if we dont have the recmaster capability */
1565         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1566                 return false;
1567         }
1568
1569         /* we cant win if we are banned */
1570         if (rec->node_flags & NODE_FLAGS_BANNED) {
1571                 return false;
1572         }       
1573
1574         /* we will automatically win if the other node is banned */
1575         if (em->node_flags & NODE_FLAGS_BANNED) {
1576                 return true;
1577         }
1578
1579         /* try to use the most connected node */
1580         if (cmp == 0) {
1581                 cmp = (int)myem.num_connected - (int)em->num_connected;
1582         }
1583
1584         /* then the longest running node */
1585         if (cmp == 0) {
1586                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1587         }
1588
1589         if (cmp == 0) {
1590                 cmp = (int)myem.pnn - (int)em->pnn;
1591         }
1592
1593         return cmp > 0;
1594 }
1595
1596 /*
1597   send out an election request
1598  */
1599 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1600 {
1601         int ret;
1602         TDB_DATA election_data;
1603         struct election_message emsg;
1604         uint64_t srvid;
1605         struct ctdb_context *ctdb = rec->ctdb;
1606
1607         srvid = CTDB_SRVID_RECOVERY;
1608
1609         ctdb_election_data(rec, &emsg);
1610
1611         election_data.dsize = sizeof(struct election_message);
1612         election_data.dptr  = (unsigned char *)&emsg;
1613
1614
1615         /* first we assume we will win the election and set 
1616            recoverymaster to be ourself on the current node
1617          */
1618         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1619         if (ret != 0) {
1620                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1621                 return -1;
1622         }
1623
1624
1625         /* send an election message to all active nodes */
1626         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1627
1628         return 0;
1629 }
1630
1631 /*
1632   this function will unban all nodes in the cluster
1633 */
1634 static void unban_all_nodes(struct ctdb_context *ctdb)
1635 {
1636         int ret, i;
1637         struct ctdb_node_map *nodemap;
1638         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1639         
1640         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1641         if (ret != 0) {
1642                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1643                 return;
1644         }
1645
1646         for (i=0;i<nodemap->num;i++) {
1647                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1648                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1649                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1650                 }
1651         }
1652
1653         talloc_free(tmp_ctx);
1654 }
1655
1656
1657 /*
1658   we think we are winning the election - send a broadcast election request
1659  */
1660 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1661 {
1662         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1663         int ret;
1664
1665         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1666         if (ret != 0) {
1667                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1668         }
1669
1670         talloc_free(rec->send_election_te);
1671         rec->send_election_te = NULL;
1672 }
1673
1674 /*
1675   handler for memory dumps
1676 */
1677 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1678                              TDB_DATA data, void *private_data)
1679 {
1680         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1681         TDB_DATA *dump;
1682         int ret;
1683         struct rd_memdump_reply *rd;
1684
1685         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1686                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1687                 return;
1688         }
1689         rd = (struct rd_memdump_reply *)data.dptr;
1690
1691         dump = talloc_zero(tmp_ctx, TDB_DATA);
1692         if (dump == NULL) {
1693                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1694                 talloc_free(tmp_ctx);
1695                 return;
1696         }
1697         ret = ctdb_dump_memory(ctdb, dump);
1698         if (ret != 0) {
1699                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1700                 talloc_free(tmp_ctx);
1701                 return;
1702         }
1703
1704 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1705
1706         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1707         if (ret != 0) {
1708                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1709                 return;
1710         }
1711
1712         talloc_free(tmp_ctx);
1713 }
1714
1715 /*
1716   handler for recovery master elections
1717 */
1718 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1719                              TDB_DATA data, void *private_data)
1720 {
1721         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1722         int ret;
1723         struct election_message *em = (struct election_message *)data.dptr;
1724         TALLOC_CTX *mem_ctx;
1725
1726         /* we got an election packet - update the timeout for the election */
1727         talloc_free(rec->election_timeout);
1728         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1729                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1730                                                 ctdb_election_timeout, rec);
1731
1732         mem_ctx = talloc_new(ctdb);
1733
1734         /* someone called an election. check their election data
1735            and if we disagree and we would rather be the elected node, 
1736            send a new election message to all other nodes
1737          */
1738         if (ctdb_election_win(rec, em)) {
1739                 if (!rec->send_election_te) {
1740                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1741                                                                 timeval_current_ofs(0, 500000),
1742                                                                 election_send_request, rec);
1743                 }
1744                 talloc_free(mem_ctx);
1745                 /*unban_all_nodes(ctdb);*/
1746                 return;
1747         }
1748         
1749         /* we didn't win */
1750         talloc_free(rec->send_election_te);
1751         rec->send_election_te = NULL;
1752
1753         /* release the recmaster lock */
1754         if (em->pnn != ctdb->pnn &&
1755             ctdb->recovery_lock_fd != -1) {
1756                 close(ctdb->recovery_lock_fd);
1757                 ctdb->recovery_lock_fd = -1;
1758                 unban_all_nodes(ctdb);
1759         }
1760
1761         /* ok, let that guy become recmaster then */
1762         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1763         if (ret != 0) {
1764                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1765                 talloc_free(mem_ctx);
1766                 return;
1767         }
1768
1769         /* release any bans */
1770         rec->last_culprit = (uint32_t)-1;
1771         talloc_free(rec->banned_nodes);
1772         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1773         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1774
1775         talloc_free(mem_ctx);
1776         return;
1777 }
1778
1779
1780 /*
1781   force the start of the election process
1782  */
1783 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1784                            struct ctdb_node_map *nodemap)
1785 {
1786         int ret;
1787         struct ctdb_context *ctdb = rec->ctdb;
1788
1789         /* set all nodes to recovery mode to stop all internode traffic */
1790         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1791         if (ret!=0) {
1792                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1793                 return;
1794         }
1795
1796         talloc_free(rec->election_timeout);
1797         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1798                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1799                                                 ctdb_election_timeout, rec);
1800
1801         ret = send_election_request(rec, pnn);
1802         if (ret!=0) {
1803                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1804                 return;
1805         }
1806
1807         /* wait for a few seconds to collect all responses */
1808         ctdb_wait_election(rec);
1809 }
1810
1811
1812
1813 /*
1814   handler for when a node changes its flags
1815 */
1816 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1817                             TDB_DATA data, void *private_data)
1818 {
1819         int ret;
1820         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1821         struct ctdb_node_map *nodemap=NULL;
1822         TALLOC_CTX *tmp_ctx;
1823         uint32_t changed_flags;
1824         int i;
1825         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1826
1827         if (data.dsize != sizeof(*c)) {
1828                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1829                 return;
1830         }
1831
1832         tmp_ctx = talloc_new(ctdb);
1833         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1834
1835         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1836         if (ret != 0) {
1837                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1838                 talloc_free(tmp_ctx);
1839                 return;         
1840         }
1841
1842
1843         for (i=0;i<nodemap->num;i++) {
1844                 if (nodemap->nodes[i].pnn == c->pnn) break;
1845         }
1846
1847         if (i == nodemap->num) {
1848                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1849                 talloc_free(tmp_ctx);
1850                 return;
1851         }
1852
1853         changed_flags = c->old_flags ^ c->new_flags;
1854
1855         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1856            This flag is handled locally based on whether the local node
1857            can communicate with the node or not.
1858         */
1859         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1860         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1861                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1862         }
1863
1864         if (nodemap->nodes[i].flags != c->new_flags) {
1865                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1866         }
1867
1868         nodemap->nodes[i].flags = c->new_flags;
1869
1870         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1871                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1872
1873         if (ret == 0) {
1874                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1875                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1876         }
1877         
1878         if (ret == 0 &&
1879             ctdb->recovery_master == ctdb->pnn &&
1880             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1881             ctdb->vnn) {
1882                 /* Only do the takeover run if the perm disabled or unhealthy
1883                    flags changed since these will cause an ip failover but not
1884                    a recovery.
1885                    If the node became disconnected or banned this will also
1886                    lead to an ip address failover but that is handled 
1887                    during recovery
1888                 */
1889                 if (changed_flags & NODE_FLAGS_DISABLED) {
1890                         rec->need_takeover_run = true;
1891                 }
1892         }
1893
1894         talloc_free(tmp_ctx);
1895 }
1896
1897
1898
1899 struct verify_recmode_normal_data {
1900         uint32_t count;
1901         enum monitor_result status;
1902 };
1903
1904 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1905 {
1906         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1907
1908
1909         /* one more node has responded with recmode data*/
1910         rmdata->count--;
1911
1912         /* if we failed to get the recmode, then return an error and let
1913            the main loop try again.
1914         */
1915         if (state->state != CTDB_CONTROL_DONE) {
1916                 if (rmdata->status == MONITOR_OK) {
1917                         rmdata->status = MONITOR_FAILED;
1918                 }
1919                 return;
1920         }
1921
1922         /* if we got a response, then the recmode will be stored in the
1923            status field
1924         */
1925         if (state->status != CTDB_RECOVERY_NORMAL) {
1926                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1927                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1928         }
1929
1930         return;
1931 }
1932
1933
1934 /* verify that all nodes are in normal recovery mode */
1935 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1936 {
1937         struct verify_recmode_normal_data *rmdata;
1938         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1939         struct ctdb_client_control_state *state;
1940         enum monitor_result status;
1941         int j;
1942         
1943         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1944         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1945         rmdata->count  = 0;
1946         rmdata->status = MONITOR_OK;
1947
1948         /* loop over all active nodes and send an async getrecmode call to 
1949            them*/
1950         for (j=0; j<nodemap->num; j++) {
1951                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1952                         continue;
1953                 }
1954                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1955                                         CONTROL_TIMEOUT(), 
1956                                         nodemap->nodes[j].pnn);
1957                 if (state == NULL) {
1958                         /* we failed to send the control, treat this as 
1959                            an error and try again next iteration
1960                         */                      
1961                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1962                         talloc_free(mem_ctx);
1963                         return MONITOR_FAILED;
1964                 }
1965
1966                 /* set up the callback functions */
1967                 state->async.fn = verify_recmode_normal_callback;
1968                 state->async.private_data = rmdata;
1969
1970                 /* one more control to wait for to complete */
1971                 rmdata->count++;
1972         }
1973
1974
1975         /* now wait for up to the maximum number of seconds allowed
1976            or until all nodes we expect a response from has replied
1977         */
1978         while (rmdata->count > 0) {
1979                 event_loop_once(ctdb->ev);
1980         }
1981
1982         status = rmdata->status;
1983         talloc_free(mem_ctx);
1984         return status;
1985 }
1986
1987
1988 struct verify_recmaster_data {
1989         struct ctdb_recoverd *rec;
1990         uint32_t count;
1991         uint32_t pnn;
1992         enum monitor_result status;
1993 };
1994
1995 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1996 {
1997         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1998
1999
2000         /* one more node has responded with recmaster data*/
2001         rmdata->count--;
2002
2003         /* if we failed to get the recmaster, then return an error and let
2004            the main loop try again.
2005         */
2006         if (state->state != CTDB_CONTROL_DONE) {
2007                 if (rmdata->status == MONITOR_OK) {
2008                         rmdata->status = MONITOR_FAILED;
2009                 }
2010                 return;
2011         }
2012
2013         /* if we got a response, then the recmaster will be stored in the
2014            status field
2015         */
2016         if (state->status != rmdata->pnn) {
2017                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2018                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2019                 rmdata->status = MONITOR_ELECTION_NEEDED;
2020         }
2021
2022         return;
2023 }
2024
2025
2026 /* verify that all nodes agree that we are the recmaster */
2027 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2028 {
2029         struct ctdb_context *ctdb = rec->ctdb;
2030         struct verify_recmaster_data *rmdata;
2031         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2032         struct ctdb_client_control_state *state;
2033         enum monitor_result status;
2034         int j;
2035         
2036         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2037         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2038         rmdata->rec    = rec;
2039         rmdata->count  = 0;
2040         rmdata->pnn    = pnn;
2041         rmdata->status = MONITOR_OK;
2042
2043         /* loop over all active nodes and send an async getrecmaster call to 
2044            them*/
2045         for (j=0; j<nodemap->num; j++) {
2046                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2047                         continue;
2048                 }
2049                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2050                                         CONTROL_TIMEOUT(),
2051                                         nodemap->nodes[j].pnn);
2052                 if (state == NULL) {
2053                         /* we failed to send the control, treat this as 
2054                            an error and try again next iteration
2055                         */                      
2056                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2057                         talloc_free(mem_ctx);
2058                         return MONITOR_FAILED;
2059                 }
2060
2061                 /* set up the callback functions */
2062                 state->async.fn = verify_recmaster_callback;
2063                 state->async.private_data = rmdata;
2064
2065                 /* one more control to wait for to complete */
2066                 rmdata->count++;
2067         }
2068
2069
2070         /* now wait for up to the maximum number of seconds allowed
2071            or until all nodes we expect a response from has replied
2072         */
2073         while (rmdata->count > 0) {
2074                 event_loop_once(ctdb->ev);
2075         }
2076
2077         status = rmdata->status;
2078         talloc_free(mem_ctx);
2079         return status;
2080 }
2081
2082 /*
2083   this function writes the number of connected nodes we have for this pnn
2084   to the pnn slot in the reclock file
2085 */
2086 static void
2087 ctdb_recoverd_write_pnn_connect_count(struct ctdb_recoverd *rec)
2088 {
2089         const char count = rec->num_connected;
2090         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
2091
2092         if (rec->rec_file_fd == -1) {
2093                 DEBUG(DEBUG_CRIT,(__location__ " Unable to write pnn count. pnnfile is not open.\n"));
2094                 return;
2095         } 
2096
2097         if (pwrite(rec->rec_file_fd, &count, 1, ctdb->pnn) == -1) {
2098                 DEBUG(DEBUG_CRIT, (__location__ " Failed to write pnn count\n"));
2099                 close(rec->rec_file_fd);
2100                 rec->rec_file_fd = -1;
2101         }
2102 }
2103
2104 /* 
2105   this function opens the reclock file and sets a byterage lock for the single
2106   byte at position pnn+1.
2107   the existence/non-existence of such a lock provides an alternative mechanism
2108   to know whether a remote node(recovery daemon) is running or not.
2109 */
2110 static void
2111 ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
2112 {
2113         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
2114         struct flock lock;
2115         char *pnnfile = NULL;
2116
2117         DEBUG(DEBUG_INFO, ("Setting PNN lock for pnn:%d\n", ctdb->pnn));
2118
2119         if (rec->rec_file_fd != -1) {
2120                 close(rec->rec_file_fd);
2121                 rec->rec_file_fd = -1;
2122         }
2123
2124         pnnfile = talloc_asprintf(rec, "%s.pnn", ctdb->recovery_lock_file);
2125         CTDB_NO_MEMORY_FATAL(ctdb, pnnfile);
2126
2127         rec->rec_file_fd = open(pnnfile, O_RDWR|O_CREAT, 0600);
2128         if (rec->rec_file_fd == -1) {
2129                 DEBUG(DEBUG_CRIT,(__location__ " Unable to open %s - (%s)\n", 
2130                          pnnfile, strerror(errno)));
2131                 talloc_free(pnnfile);
2132                 return;
2133         }
2134
2135         set_close_on_exec(rec->rec_file_fd);
2136         lock.l_type = F_WRLCK;
2137         lock.l_whence = SEEK_SET;
2138         lock.l_start = ctdb->pnn;
2139         lock.l_len = 1;
2140         lock.l_pid = 0;
2141
2142         if (fcntl(rec->rec_file_fd, F_SETLK, &lock) != 0) {
2143                 close(rec->rec_file_fd);
2144                 rec->rec_file_fd = -1;
2145                 DEBUG(DEBUG_CRIT,(__location__ " Failed to get pnn lock on '%s'\n", pnnfile));
2146                 talloc_free(pnnfile);
2147                 return;
2148         }
2149
2150
2151         DEBUG(DEBUG_NOTICE,(__location__ " Got pnn lock on '%s'\n", pnnfile));
2152         talloc_free(pnnfile);
2153
2154         /* we start out with 0 connected nodes */
2155         ctdb_recoverd_write_pnn_connect_count(rec);
2156 }
2157
2158 /*
2159   called when we need to do the periodical reclock pnn count update
2160  */
2161 static void ctdb_update_pnn_count(struct event_context *ev, struct timed_event *te, 
2162                                   struct timeval t, void *p)
2163 {
2164         int i, count;
2165         struct ctdb_recoverd *rec     = talloc_get_type(p, struct ctdb_recoverd);
2166         struct ctdb_context *ctdb     = rec->ctdb;
2167         struct ctdb_node_map *nodemap = rec->nodemap;
2168
2169         /* close and reopen the pnn lock file */
2170         ctdb_recoverd_get_pnn_lock(rec);
2171
2172         ctdb_recoverd_write_pnn_connect_count(rec);
2173
2174         event_add_timed(rec->ctdb->ev, rec->ctdb,
2175                 timeval_current_ofs(ctdb->tunable.reclock_ping_period, 0), 
2176                 ctdb_update_pnn_count, rec);
2177
2178         /* check if there is a split cluster and yeld the recmaster role
2179            it the other half of the cluster is larger 
2180         */
2181         DEBUG(DEBUG_DEBUG, ("CHECK FOR SPLIT CLUSTER\n"));
2182         if (rec->nodemap == NULL) {
2183                 return;
2184         }
2185         if (rec->rec_file_fd == -1) {
2186                 return;
2187         }
2188         /* only test this if we think we are the recmaster */
2189         if (ctdb->pnn != rec->recmaster) {
2190                 DEBUG(DEBUG_DEBUG, ("We are not recmaster, skip test\n"));
2191                 return;
2192         }
2193         if (ctdb->recovery_lock_fd == -1) {
2194                 DEBUG(DEBUG_ERR, (__location__ " Lost reclock pnn file. Yielding recmaster role\n"));
2195                 close(ctdb->recovery_lock_fd);
2196                 ctdb->recovery_lock_fd = -1;
2197                 force_election(rec, ctdb->pnn, rec->nodemap);
2198                 return;
2199         }
2200         for (i=0; i<nodemap->num; i++) {
2201                 /* we dont need to check ourself */
2202                 if (nodemap->nodes[i].pnn == ctdb->pnn) {
2203                         continue;
2204                 }
2205                 /* dont check nodes that are connected to us */
2206                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2207                         continue;
2208                 }
2209                 /* check if the node is "connected" and how connected it it */
2210                 count = ctdb_read_pnn_lock(rec->rec_file_fd, nodemap->nodes[i].pnn);
2211                 if (count < 0) {
2212                         continue;
2213                 }
2214                 /* check if that node is more connected that us */
2215                 if (count > rec->num_connected) {
2216                         DEBUG(DEBUG_ERR, ("DISCONNECTED Node %u is more connected than we are, yielding recmaster role\n", nodemap->nodes[i].pnn));
2217                         close(ctdb->recovery_lock_fd);
2218                         ctdb->recovery_lock_fd = -1;
2219                         force_election(rec, ctdb->pnn, rec->nodemap);
2220                         return;
2221                 }
2222         }
2223 }
2224
2225 /*
2226   the main monitoring loop
2227  */
2228 static void monitor_cluster(struct ctdb_context *ctdb)
2229 {
2230         uint32_t pnn;
2231         TALLOC_CTX *mem_ctx=NULL;
2232         struct ctdb_node_map *nodemap=NULL;
2233         struct ctdb_node_map *remote_nodemap=NULL;
2234         struct ctdb_vnn_map *vnnmap=NULL;
2235         struct ctdb_vnn_map *remote_vnnmap=NULL;
2236         int32_t debug_level;
2237         int i, j, ret;
2238         struct ctdb_recoverd *rec;
2239         struct ctdb_all_public_ips *ips;
2240         char c;
2241
2242         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2243
2244         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2245         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2246
2247         rec->ctdb = ctdb;
2248         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2249         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2250
2251         rec->priority_time = timeval_current();
2252
2253         /* open the rec file fd and lock our slot */
2254         rec->rec_file_fd = -1;
2255         ctdb_recoverd_get_pnn_lock(rec);
2256
2257         /* register a message port for sending memory dumps */
2258         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2259
2260         /* register a message port for recovery elections */
2261         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2262
2263         /* and one for when nodes are disabled/enabled */
2264         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
2265
2266         /* and one for when nodes are banned */
2267         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2268
2269         /* and one for when nodes are unbanned */
2270         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2271
2272         /* register a message port for vacuum fetch */
2273         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2274
2275         /* update the reclock pnn file connected count on a regular basis */
2276         event_add_timed(ctdb->ev, ctdb,
2277                 timeval_current_ofs(ctdb->tunable.reclock_ping_period, 0), 
2278                 ctdb_update_pnn_count, rec);
2279
2280 again:
2281         if (mem_ctx) {
2282                 talloc_free(mem_ctx);
2283                 mem_ctx = NULL;
2284         }
2285         mem_ctx = talloc_new(ctdb);
2286         if (!mem_ctx) {
2287                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2288                 exit(-1);
2289         }
2290
2291         /* we only check for recovery once every second */
2292         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2293
2294         /* verify that the main daemon is still running */
2295         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2296                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2297                 exit(-1);
2298         }
2299
2300         if (rec->election_timeout) {
2301                 /* an election is in progress */
2302                 goto again;
2303         }
2304
2305         /* read the debug level from the parent and update locally */
2306         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2307         if (ret !=0) {
2308                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2309                 goto again;
2310         }
2311         LogLevel = debug_level;
2312
2313
2314         /* We must check if we need to ban a node here but we want to do this
2315            as early as possible so we dont wait until we have pulled the node
2316            map from the local node. thats why we have the hardcoded value 20
2317         */
2318         if (rec->culprit_counter > 20) {
2319                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2320                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2321                          ctdb->tunable.recovery_ban_period));
2322                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2323         }
2324
2325         /* get relevant tunables */
2326         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2327         if (ret != 0) {
2328                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2329                 goto again;
2330         }
2331
2332         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2333         if (pnn == (uint32_t)-1) {
2334                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2335                 goto again;
2336         }
2337
2338         /* get the vnnmap */
2339         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2340         if (ret != 0) {
2341                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2342                 goto again;
2343         }
2344
2345
2346         /* get number of nodes */
2347         if (rec->nodemap) {
2348                 talloc_free(rec->nodemap);
2349                 rec->nodemap = NULL;
2350                 nodemap=NULL;
2351         }
2352         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2353         if (ret != 0) {
2354                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2355                 goto again;
2356         }
2357         nodemap = rec->nodemap;
2358
2359         /* check which node is the recovery master */
2360         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2361         if (ret != 0) {
2362                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2363                 goto again;
2364         }
2365
2366         if (rec->recmaster == (uint32_t)-1) {
2367                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2368                 force_election(rec, pnn, nodemap);
2369                 goto again;
2370         }
2371         
2372         /* check that we (recovery daemon) and the local ctdb daemon
2373            agrees on whether we are banned or not
2374         */
2375         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2376                 if (rec->banned_nodes[pnn] == NULL) {
2377                         if (rec->recmaster == pnn) {
2378                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2379
2380                                 ctdb_unban_node(rec, pnn);
2381                         } else {
2382                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2383                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2384                                 ctdb_set_culprit(rec, pnn);
2385                         }
2386                         goto again;
2387                 }
2388         } else {
2389                 if (rec->banned_nodes[pnn] != NULL) {
2390                         if (rec->recmaster == pnn) {
2391                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2392
2393                                 ctdb_unban_node(rec, pnn);
2394                         } else {
2395                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2396
2397                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2398                                 ctdb_set_culprit(rec, pnn);
2399                         }
2400                         goto again;
2401                 }
2402         }
2403
2404         /* remember our own node flags */
2405         rec->node_flags = nodemap->nodes[pnn].flags;
2406
2407         /* count how many active nodes there are */
2408         rec->num_active    = 0;
2409         rec->num_connected = 0;
2410         for (i=0; i<nodemap->num; i++) {
2411                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2412                         rec->num_active++;
2413                 }
2414                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2415                         rec->num_connected++;
2416                 }
2417         }
2418
2419
2420         /* verify that the recmaster node is still active */
2421         for (j=0; j<nodemap->num; j++) {
2422                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2423                         break;
2424                 }
2425         }
2426
2427         if (j == nodemap->num) {
2428                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2429                 force_election(rec, pnn, nodemap);
2430                 goto again;
2431         }
2432
2433         /* if recovery master is disconnected we must elect a new recmaster */
2434         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2435                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2436                 force_election(rec, pnn, nodemap);
2437                 goto again;
2438         }
2439
2440         /* grap the nodemap from the recovery master to check if it is banned */
2441         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2442                                    mem_ctx, &remote_nodemap);
2443         if (ret != 0) {
2444                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2445                           nodemap->nodes[j].pnn));
2446                 goto again;
2447         }
2448
2449
2450         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2451                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2452                 force_election(rec, pnn, nodemap);
2453                 goto again;
2454         }
2455
2456         /* verify that the public ip address allocation is consistent */
2457         if (ctdb->vnn != NULL) {
2458                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2459                 if (ret != 0) {
2460                         DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", i));
2461                         goto again;
2462                 }
2463                 for (j=0; j<ips->num; j++) {
2464                         /* verify that we have the ip addresses we should have
2465                            and we dont have ones we shouldnt have.
2466                            if we find an inconsistency we set recmode to
2467                            active on the local node and wait for the recmaster
2468                            to do a full blown recovery
2469                         */
2470                         if (ips->ips[j].pnn == pnn) {
2471                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
2472                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2473                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2474                                         if (ret != 0) {
2475                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2476                                                 goto again;
2477                                         }
2478                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2479                                         if (ret != 0) {
2480                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2481                                                 goto again;
2482                                         }
2483                                 }
2484                         } else {
2485                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
2486                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2487                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2488                                         if (ret != 0) {
2489                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2490                                                 goto again;
2491                                         }
2492                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2493                                         if (ret != 0) {
2494                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2495                                                 goto again;
2496                                         }
2497                                 }
2498                         }
2499                 }
2500         }
2501
2502         /* if we are not the recmaster then we do not need to check
2503            if recovery is needed
2504          */
2505         if (pnn != rec->recmaster) {
2506                 goto again;
2507         }
2508
2509
2510         /* ensure our local copies of flags are right */
2511         ret = update_local_flags(rec, nodemap);
2512         if (ret == MONITOR_ELECTION_NEEDED) {
2513                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2514                 force_election(rec, pnn, nodemap);
2515                 goto again;
2516         }
2517         if (ret != MONITOR_OK) {
2518                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2519                 goto again;
2520         }
2521
2522         /* update the list of public ips that a node can handle for
2523            all connected nodes
2524         */
2525         for (j=0; j<nodemap->num; j++) {
2526                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2527                         continue;
2528                 }
2529                 /* release any existing data */
2530                 if (ctdb->nodes[j]->public_ips) {
2531                         talloc_free(ctdb->nodes[j]->public_ips);
2532                         ctdb->nodes[j]->public_ips = NULL;
2533                 }
2534                 /* grab a new shiny list of public ips from the node */
2535                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2536                         ctdb->nodes[j]->pnn, 
2537                         ctdb->nodes,
2538                         &ctdb->nodes[j]->public_ips)) {
2539                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2540                                 ctdb->nodes[j]->pnn));
2541                         goto again;
2542                 }
2543         }
2544
2545
2546         /* verify that all active nodes agree that we are the recmaster */
2547         switch (verify_recmaster(rec, nodemap, pnn)) {
2548         case MONITOR_RECOVERY_NEEDED:
2549                 /* can not happen */
2550                 goto again;
2551         case MONITOR_ELECTION_NEEDED:
2552                 force_election(rec, pnn, nodemap);
2553                 goto again;
2554         case MONITOR_OK:
2555                 break;
2556         case MONITOR_FAILED:
2557                 goto again;
2558         }
2559
2560
2561         if (rec->need_recovery) {
2562                 /* a previous recovery didn't finish */
2563                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2564                 goto again;             
2565         }
2566
2567         /* verify that all active nodes are in normal mode 
2568            and not in recovery mode 
2569          */
2570         switch (verify_recmode(ctdb, nodemap)) {
2571         case MONITOR_RECOVERY_NEEDED:
2572                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2573                 goto again;
2574         case MONITOR_FAILED:
2575                 goto again;
2576         case MONITOR_ELECTION_NEEDED:
2577                 /* can not happen */
2578         case MONITOR_OK:
2579                 break;
2580         }
2581
2582
2583         /* we should have the reclock - check its not stale */
2584         if (ctdb->recovery_lock_fd == -1) {
2585                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2586                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2587                 goto again;
2588         }
2589
2590         if (pread(ctdb->recovery_lock_fd, &c, 1, 0) == -1) {
2591                 DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2592                 close(ctdb->recovery_lock_fd);
2593                 ctdb->recovery_lock_fd = -1;
2594                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2595                 goto again;
2596         }
2597
2598         /* get the nodemap for all active remote nodes and verify
2599            they are the same as for this node
2600          */
2601         for (j=0; j<nodemap->num; j++) {
2602                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2603                         continue;
2604                 }
2605                 if (nodemap->nodes[j].pnn == pnn) {
2606                         continue;
2607                 }
2608
2609                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2610                                            mem_ctx, &remote_nodemap);
2611                 if (ret != 0) {
2612                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
2613                                   nodemap->nodes[j].pnn));
2614                         goto again;
2615                 }
2616
2617                 /* if the nodes disagree on how many nodes there are
2618                    then this is a good reason to try recovery
2619                  */
2620                 if (remote_nodemap->num != nodemap->num) {
2621                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2622                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
2623                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2624                         goto again;
2625                 }
2626
2627                 /* if the nodes disagree on which nodes exist and are
2628                    active, then that is also a good reason to do recovery
2629                  */
2630                 for (i=0;i<nodemap->num;i++) {
2631                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2632                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2633                                           nodemap->nodes[j].pnn, i, 
2634                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2635                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2636                                             vnnmap, nodemap->nodes[j].pnn);
2637                                 goto again;
2638                         }
2639                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2640                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2641                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2642                                           nodemap->nodes[j].pnn, i,
2643                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2644                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2645                                             vnnmap, nodemap->nodes[j].pnn);
2646                                 goto again;
2647                         }
2648                 }
2649
2650         }
2651
2652
2653         /* there better be the same number of lmasters in the vnn map
2654            as there are active nodes or we will have to do a recovery
2655          */
2656         if (vnnmap->size != rec->num_active) {
2657                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2658                           vnnmap->size, rec->num_active));
2659                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2660                 goto again;
2661         }
2662
2663         /* verify that all active nodes in the nodemap also exist in 
2664            the vnnmap.
2665          */
2666         for (j=0; j<nodemap->num; j++) {
2667                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2668                         continue;
2669                 }
2670                 if (nodemap->nodes[j].pnn == pnn) {
2671                         continue;
2672                 }
2673
2674                 for (i=0; i<vnnmap->size; i++) {
2675                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2676                                 break;
2677                         }
2678                 }
2679                 if (i == vnnmap->size) {
2680                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2681                                   nodemap->nodes[j].pnn));
2682                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2683                         goto again;
2684                 }
2685         }
2686
2687         
2688         /* verify that all other nodes have the same vnnmap
2689            and are from the same generation
2690          */
2691         for (j=0; j<nodemap->num; j++) {
2692                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2693                         continue;
2694                 }
2695                 if (nodemap->nodes[j].pnn == pnn) {
2696                         continue;
2697                 }
2698
2699                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2700                                           mem_ctx, &remote_vnnmap);
2701                 if (ret != 0) {
2702                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2703                                   nodemap->nodes[j].pnn));
2704                         goto again;
2705                 }
2706
2707                 /* verify the vnnmap generation is the same */
2708                 if (vnnmap->generation != remote_vnnmap->generation) {
2709                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2710                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2711                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2712                         goto again;
2713                 }
2714
2715                 /* verify the vnnmap size is the same */
2716                 if (vnnmap->size != remote_vnnmap->size) {
2717                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2718                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2719                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2720                         goto again;
2721                 }
2722
2723                 /* verify the vnnmap is the same */
2724                 for (i=0;i<vnnmap->size;i++) {
2725                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2726                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2727                                           nodemap->nodes[j].pnn));
2728                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2729                                             vnnmap, nodemap->nodes[j].pnn);
2730                                 goto again;
2731                         }
2732                 }
2733         }
2734
2735         /* we might need to change who has what IP assigned */
2736         if (rec->need_takeover_run) {
2737                 rec->need_takeover_run = false;
2738
2739                 /* execute the "startrecovery" event script on all nodes */
2740                 ret = run_startrecovery_eventscript(ctdb, nodemap);
2741                 if (ret!=0) {
2742                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
2743                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2744                                     vnnmap, ctdb->pnn);
2745                 }
2746
2747                 ret = ctdb_takeover_run(ctdb, nodemap);
2748                 if (ret != 0) {
2749                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2750                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2751                                     vnnmap, ctdb->pnn);
2752                 }
2753
2754                 /* execute the "recovered" event script on all nodes */
2755                 ret = run_recovered_eventscript(ctdb, nodemap);
2756                 if (ret!=0) {
2757                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
2758                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2759                                     vnnmap, ctdb->pnn);
2760                 }
2761         }
2762
2763         goto again;
2764
2765 }
2766
2767 /*
2768   event handler for when the main ctdbd dies
2769  */
2770 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2771                                  uint16_t flags, void *private_data)
2772 {
2773         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
2774         _exit(1);
2775 }
2776
2777 /*
2778   called regularly to verify that the recovery daemon is still running
2779  */
2780 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
2781                               struct timeval yt, void *p)
2782 {
2783         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2784
2785         /* make sure we harvest the child if signals are blocked for some
2786            reason
2787         */
2788         waitpid(ctdb->recoverd_pid, 0, WNOHANG);
2789
2790         if (kill(ctdb->recoverd_pid, 0) != 0) {
2791                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
2792
2793                 ctdb_stop_recoverd(ctdb);
2794                 ctdb_stop_keepalive(ctdb);
2795                 ctdb_stop_monitoring(ctdb);
2796                 ctdb_release_all_ips(ctdb);
2797                 ctdb->methods->shutdown(ctdb);
2798                 ctdb_event_script(ctdb, "shutdown");
2799
2800                 exit(10);       
2801         }
2802
2803         event_add_timed(ctdb->ev, ctdb, 
2804                         timeval_current_ofs(30, 0),
2805                         ctdb_check_recd, ctdb);
2806 }
2807
2808 /*
2809   startup the recovery daemon as a child of the main ctdb daemon
2810  */
2811 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2812 {
2813         int ret;
2814         int fd[2];
2815
2816         if (pipe(fd) != 0) {
2817                 return -1;
2818         }
2819
2820         ctdb->ctdbd_pid = getpid();
2821
2822         ctdb->recoverd_pid = fork();
2823         if (ctdb->recoverd_pid == -1) {
2824                 return -1;
2825         }
2826         
2827         if (ctdb->recoverd_pid != 0) {
2828                 close(fd[0]);
2829                 event_add_timed(ctdb->ev, ctdb, 
2830                                 timeval_current_ofs(30, 0),
2831                                 ctdb_check_recd, ctdb);
2832                 return 0;
2833         }
2834
2835         close(fd[1]);
2836
2837         /* shutdown the transport */
2838         ctdb->methods->shutdown(ctdb);
2839
2840         /* get a new event context */
2841         talloc_free(ctdb->ev);
2842         ctdb->ev = event_context_init(ctdb);
2843
2844         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2845                      ctdb_recoverd_parent, &fd[0]);     
2846
2847         close(ctdb->daemon.sd);
2848         ctdb->daemon.sd = -1;
2849
2850         srandom(getpid() ^ time(NULL));
2851
2852         /* the recovery daemon does not need to be realtime */
2853         if (ctdb->do_setsched) {
2854                 ctdb_restore_scheduler(ctdb);
2855         }
2856
2857         /* initialise ctdb */
2858         ret = ctdb_socket_connect(ctdb);
2859         if (ret != 0) {
2860                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
2861                 exit(1);
2862         }
2863
2864         monitor_cluster(ctdb);
2865
2866         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
2867         return -1;
2868 }
2869
2870 /*
2871   shutdown the recovery daemon
2872  */
2873 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2874 {
2875         if (ctdb->recoverd_pid == 0) {
2876                 return;
2877         }
2878
2879         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
2880         kill(ctdb->recoverd_pid, SIGTERM);
2881 }