force recovery if unable to tell a node to release an IP
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "popt.h"
25 #include "cmdline.h"
26 #include "../include/ctdb.h"
27 #include "../include/ctdb_private.h"
28
29
30 struct ban_state {
31         struct ctdb_recoverd *rec;
32         uint32_t banned_node;
33 };
34
35 /*
36   private state of recovery daemon
37  */
38 struct ctdb_recoverd {
39         struct ctdb_context *ctdb;
40         uint32_t last_culprit;
41         uint32_t culprit_counter;
42         struct timeval first_recover_time;
43         struct ban_state **banned_nodes;
44         struct timeval priority_time;
45 };
46
47 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
48 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
49
50 /*
51   unban a node
52  */
53 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
54 {
55         struct ctdb_context *ctdb = rec->ctdb;
56
57         if (!ctdb_validate_pnn(ctdb, pnn)) {
58                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
59                 return;
60         }
61
62         if (rec->banned_nodes[pnn] == NULL) {
63                 return;
64         }
65
66         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
67
68         talloc_free(rec->banned_nodes[pnn]);
69         rec->banned_nodes[pnn] = NULL;
70 }
71
72
73 /*
74   called when a ban has timed out
75  */
76 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
77 {
78         struct ban_state *state = talloc_get_type(p, struct ban_state);
79         struct ctdb_recoverd *rec = state->rec;
80         uint32_t pnn = state->banned_node;
81
82         DEBUG(0,("Node %u is now unbanned\n", pnn));
83         ctdb_unban_node(rec, pnn);
84 }
85
86 /*
87   ban a node for a period of time
88  */
89 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
90 {
91         struct ctdb_context *ctdb = rec->ctdb;
92
93         if (!ctdb_validate_pnn(ctdb, pnn)) {
94                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
95                 return;
96         }
97
98         if (pnn == ctdb->pnn) {
99                 DEBUG(0,("self ban - lowering our election priority\n"));
100                 /* banning ourselves - lower our election priority */
101                 rec->priority_time = timeval_current();
102         }
103
104         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
105
106         rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
107         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
108
109         rec->banned_nodes[pnn]->rec = rec;
110         rec->banned_nodes[pnn]->banned_node = pnn;
111
112         if (ban_time != 0) {
113                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
114                                 timeval_current_ofs(ban_time, 0),
115                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
116         }
117 }
118
119 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
120
121
122 struct freeze_node_data {
123         uint32_t count;
124         enum monitor_result status;
125 };
126
127
128 static void freeze_node_callback(struct ctdb_client_control_state *state)
129 {
130         struct freeze_node_data *fndata = talloc_get_type(state->async.private, struct freeze_node_data);
131
132
133         /* one more node has responded to our freeze node*/
134         fndata->count--;
135
136         /* if we failed to freeze the node, we must trigger another recovery */
137         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
138                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
139                 fndata->status = MONITOR_RECOVERY_NEEDED;
140         }
141
142         return;
143 }
144
145
146
147 /* freeze all nodes */
148 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
149 {
150         struct freeze_node_data *fndata;
151         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
152         struct ctdb_client_control_state *state;
153         enum monitor_result status;
154         int j;
155         
156         fndata = talloc(mem_ctx, struct freeze_node_data);
157         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
158         fndata->count  = 0;
159         fndata->status = MONITOR_OK;
160
161         /* loop over all active nodes and send an async freeze call to 
162            them*/
163         for (j=0; j<nodemap->num; j++) {
164                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
165                         continue;
166                 }
167                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
168                                         CONTROL_TIMEOUT(), 
169                                         nodemap->nodes[j].pnn);
170                 if (state == NULL) {
171                         /* we failed to send the control, treat this as 
172                            an error and try again next iteration
173                         */                      
174                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
175                         talloc_free(mem_ctx);
176                         return MONITOR_RECOVERY_NEEDED;
177                 }
178
179                 /* set up the callback functions */
180                 state->async.fn = freeze_node_callback;
181                 state->async.private = fndata;
182
183                 /* one more control to wait for to complete */
184                 fndata->count++;
185         }
186
187
188         /* now wait for up to the maximum number of seconds allowed
189            or until all nodes we expect a response from has replied
190         */
191         while (fndata->count > 0) {
192                 event_loop_once(ctdb->ev);
193         }
194
195         status = fndata->status;
196         talloc_free(mem_ctx);
197         return status;
198 }
199
200
201 /*
202   change recovery mode on all nodes
203  */
204 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
205 {
206         int j, ret;
207
208         /* freeze all nodes */
209         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
210                 ret = freeze_all_nodes(ctdb, nodemap);
211                 if (ret != MONITOR_OK) {
212                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
213                         return -1;
214                 }
215         }
216
217
218         /* set recovery mode to active on all nodes */
219         for (j=0; j<nodemap->num; j++) {
220                 /* dont change it for nodes that are unavailable */
221                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
222                         continue;
223                 }
224
225                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
226                 if (ret != 0) {
227                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
228                         return -1;
229                 }
230
231                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
232                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
233                         if (ret != 0) {
234                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
235                                 return -1;
236                         }
237                 }
238         }
239
240         return 0;
241 }
242
243 /*
244   change recovery master on all node
245  */
246 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
247 {
248         int j, ret;
249
250         /* set recovery master to pnn on all nodes */
251         for (j=0; j<nodemap->num; j++) {
252                 /* dont change it for nodes that are unavailable */
253                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
254                         continue;
255                 }
256
257                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
258                 if (ret != 0) {
259                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
260                         return -1;
261                 }
262         }
263
264         return 0;
265 }
266
267
268 /*
269   ensure all other nodes have attached to any databases that we have
270  */
271 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
272                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
273 {
274         int i, j, db, ret;
275         struct ctdb_dbid_map *remote_dbmap;
276
277         /* verify that all other nodes have all our databases */
278         for (j=0; j<nodemap->num; j++) {
279                 /* we dont need to ourself ourselves */
280                 if (nodemap->nodes[j].pnn == pnn) {
281                         continue;
282                 }
283                 /* dont check nodes that are unavailable */
284                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
285                         continue;
286                 }
287
288                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
289                                          mem_ctx, &remote_dbmap);
290                 if (ret != 0) {
291                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
292                         return -1;
293                 }
294
295                 /* step through all local databases */
296                 for (db=0; db<dbmap->num;db++) {
297                         const char *name;
298
299
300                         for (i=0;i<remote_dbmap->num;i++) {
301                                 if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
302                                         break;
303                                 }
304                         }
305                         /* the remote node already have this database */
306                         if (i!=remote_dbmap->num) {
307                                 continue;
308                         }
309                         /* ok so we need to create this database */
310                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbids[db], mem_ctx, &name);
311                         if (ret != 0) {
312                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
313                                 return -1;
314                         }
315                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, name);
316                         if (ret != 0) {
317                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
318                                 return -1;
319                         }
320                 }
321         }
322
323         return 0;
324 }
325
326
327 /*
328   ensure we are attached to any databases that anyone else is attached to
329  */
330 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
331                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
332 {
333         int i, j, db, ret;
334         struct ctdb_dbid_map *remote_dbmap;
335
336         /* verify that we have all database any other node has */
337         for (j=0; j<nodemap->num; j++) {
338                 /* we dont need to ourself ourselves */
339                 if (nodemap->nodes[j].pnn == pnn) {
340                         continue;
341                 }
342                 /* dont check nodes that are unavailable */
343                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
344                         continue;
345                 }
346
347                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
348                                          mem_ctx, &remote_dbmap);
349                 if (ret != 0) {
350                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
351                         return -1;
352                 }
353
354                 /* step through all databases on the remote node */
355                 for (db=0; db<remote_dbmap->num;db++) {
356                         const char *name;
357
358                         for (i=0;i<(*dbmap)->num;i++) {
359                                 if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
360                                         break;
361                                 }
362                         }
363                         /* we already have this db locally */
364                         if (i!=(*dbmap)->num) {
365                                 continue;
366                         }
367                         /* ok so we need to create this database and
368                            rebuild dbmap
369                          */
370                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
371                                             remote_dbmap->dbids[db], mem_ctx, &name);
372                         if (ret != 0) {
373                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
374                                           nodemap->nodes[j].pnn));
375                                 return -1;
376                         }
377                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name);
378                         if (ret != 0) {
379                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
380                                 return -1;
381                         }
382                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
383                         if (ret != 0) {
384                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
385                                 return -1;
386                         }
387                 }
388         }
389
390         return 0;
391 }
392
393
394 /*
395   pull all the remote database contents into ours
396  */
397 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
398                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
399 {
400         int i, j, ret;
401
402         /* pull all records from all other nodes across onto this node
403            (this merges based on rsn)
404         */
405         for (i=0;i<dbmap->num;i++) {
406                 for (j=0; j<nodemap->num; j++) {
407                         /* we dont need to merge with ourselves */
408                         if (nodemap->nodes[j].pnn == pnn) {
409                                 continue;
410                         }
411                         /* dont merge from nodes that are unavailable */
412                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
413                                 continue;
414                         }
415                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
416                                                pnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
417                         if (ret != 0) {
418                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
419                                           nodemap->nodes[j].pnn, pnn));
420                                 return -1;
421                         }
422                 }
423         }
424
425         return 0;
426 }
427
428
429 /*
430   change the dmaster on all databases to point to us
431  */
432 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
433                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
434 {
435         int i, j, ret;
436
437         /* update dmaster to point to this node for all databases/nodes */
438         for (i=0;i<dbmap->num;i++) {
439                 for (j=0; j<nodemap->num; j++) {
440                         /* dont repoint nodes that are unavailable */
441                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
442                                 continue;
443                         }
444                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, ctdb, dbmap->dbids[i], pnn);
445                         if (ret != 0) {
446                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].pnn, dbmap->dbids[i]));
447                                 return -1;
448                         }
449                 }
450         }
451
452         return 0;
453 }
454
455
456 /*
457   update flags on all active nodes
458  */
459 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
460 {
461         int i;
462         for (i=0;i<nodemap->num;i++) {
463                 struct ctdb_node_flag_change c;
464                 TDB_DATA data;
465
466                 c.pnn = nodemap->nodes[i].pnn;
467                 c.old_flags = nodemap->nodes[i].flags;
468                 c.new_flags = nodemap->nodes[i].flags;
469
470                 data.dptr = (uint8_t *)&c;
471                 data.dsize = sizeof(c);
472
473                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
474                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
475
476         }
477         return 0;
478 }
479
480 /*
481   vacuum one database
482  */
483 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
484 {
485         uint64_t max_rsn;
486         int ret, i;
487
488         /* find max rsn on our local node for this db */
489         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
490         if (ret != 0) {
491                 return -1;
492         }
493
494         /* set rsn on non-empty records to max_rsn+1 */
495         for (i=0;i<nodemap->num;i++) {
496                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
497                         continue;
498                 }
499                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
500                                                  db_id, max_rsn+1);
501                 if (ret != 0) {
502                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
503                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
504                         return -1;
505                 }
506         }
507
508         /* delete records with rsn < max_rsn+1 on all nodes */
509         for (i=0;i<nodemap->num;i++) {
510                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
511                         continue;
512                 }
513                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
514                                                  db_id, max_rsn+1);
515                 if (ret != 0) {
516                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
517                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
518                         return -1;
519                 }
520         }
521
522
523         return 0;
524 }
525
526
527 /*
528   vacuum all attached databases
529  */
530 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
531                                 struct ctdb_dbid_map *dbmap)
532 {
533         int i;
534
535         /* update dmaster to point to this node for all databases/nodes */
536         for (i=0;i<dbmap->num;i++) {
537                 if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
538                         return -1;
539                 }
540         }
541         return 0;
542 }
543
544
545 /*
546   push out all our database contents to all other nodes
547  */
548 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
549                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
550 {
551         int i, j, ret;
552
553         /* push all records out to the nodes again */
554         for (i=0;i<dbmap->num;i++) {
555                 for (j=0; j<nodemap->num; j++) {
556                         /* we dont need to push to ourselves */
557                         if (nodemap->nodes[j].pnn == pnn) {
558                                 continue;
559                         }
560                         /* dont push to nodes that are unavailable */
561                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
562                                 continue;
563                         }
564                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
565                                                dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
566                         if (ret != 0) {
567                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
568                                           pnn, nodemap->nodes[j].pnn));
569                                 return -1;
570                         }
571                 }
572         }
573
574         return 0;
575 }
576
577
578 /*
579   ensure all nodes have the same vnnmap we do
580  */
581 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
582                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
583 {
584         int j, ret;
585
586         /* push the new vnn map out to all the nodes */
587         for (j=0; j<nodemap->num; j++) {
588                 /* dont push to nodes that are unavailable */
589                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
590                         continue;
591                 }
592
593                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
594                 if (ret != 0) {
595                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
596                         return -1;
597                 }
598         }
599
600         return 0;
601 }
602
603
604 /*
605   handler for when the admin bans a node
606 */
607 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
608                         TDB_DATA data, void *private_data)
609 {
610         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
611         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
612         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
613         uint32_t recmaster;
614         int ret;
615
616         if (data.dsize != sizeof(*b)) {
617                 DEBUG(0,("Bad data in ban_handler\n"));
618                 talloc_free(mem_ctx);
619                 return;
620         }
621
622         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
623         if (ret != 0) {
624                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
625                 talloc_free(mem_ctx);
626                 return;
627         }
628
629         if (recmaster != ctdb->pnn) {
630                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
631                 talloc_free(mem_ctx);
632                 return;
633         }
634
635         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
636                  b->pnn, b->ban_time));
637         ctdb_ban_node(rec, b->pnn, b->ban_time);
638         talloc_free(mem_ctx);
639 }
640
641 /*
642   handler for when the admin unbans a node
643 */
644 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
645                           TDB_DATA data, void *private_data)
646 {
647         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
648         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
649         uint32_t pnn;
650         int ret;
651         uint32_t recmaster;
652
653         if (data.dsize != sizeof(uint32_t)) {
654                 DEBUG(0,("Bad data in unban_handler\n"));
655                 talloc_free(mem_ctx);
656                 return;
657         }
658         pnn = *(uint32_t *)data.dptr;
659
660         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
661         if (ret != 0) {
662                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
663                 talloc_free(mem_ctx);
664                 return;
665         }
666
667         if (recmaster != ctdb->pnn) {
668                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
669                 talloc_free(mem_ctx);
670                 return;
671         }
672
673         DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
674         ctdb_unban_node(rec, pnn);
675         talloc_free(mem_ctx);
676 }
677
678
679
680 /*
681   called when ctdb_wait_timeout should finish
682  */
683 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
684                               struct timeval yt, void *p)
685 {
686         uint32_t *timed_out = (uint32_t *)p;
687         (*timed_out) = 1;
688 }
689
690 /*
691   wait for a given number of seconds
692  */
693 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
694 {
695         uint32_t timed_out = 0;
696         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
697         while (!timed_out) {
698                 event_loop_once(ctdb->ev);
699         }
700 }
701
702 /* Create a new random generation ip. 
703    The generation id can not be the INVALID_GENERATION id
704 */
705 static uint32_t new_generation(void)
706 {
707         uint32_t generation;
708
709         while (1) {
710                 generation = random();
711
712                 if (generation != INVALID_GENERATION) {
713                         break;
714                 }
715         }
716
717         return generation;
718 }
719                 
720 /*
721   we are the recmaster, and recovery is needed - start a recovery run
722  */
723 static int do_recovery(struct ctdb_recoverd *rec, 
724                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
725                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
726                        uint32_t culprit)
727 {
728         struct ctdb_context *ctdb = rec->ctdb;
729         int i, j, ret;
730         uint32_t generation;
731         struct ctdb_dbid_map *dbmap;
732
733         if (rec->last_culprit != culprit ||
734             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
735                 /* either a new node is the culprit, or we've decide to forgive them */
736                 rec->last_culprit = culprit;
737                 rec->first_recover_time = timeval_current();
738                 rec->culprit_counter = 0;
739         }
740         rec->culprit_counter++;
741
742         if (rec->culprit_counter > 2*nodemap->num) {
743                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
744                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
745                          ctdb->tunable.recovery_ban_period));
746                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
747         }
748
749         if (!ctdb_recovery_lock(ctdb, true)) {
750                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
751                 return -1;
752         }
753
754         /* set recovery mode to active on all nodes */
755         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
756         if (ret!=0) {
757                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
758                 return -1;
759         }
760
761         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
762
763         /* pick a new generation number */
764         generation = new_generation();
765
766         /* change the vnnmap on this node to use the new generation 
767            number but not on any other nodes.
768            this guarantees that if we abort the recovery prematurely
769            for some reason (a node stops responding?)
770            that we can just return immediately and we will reenter
771            recovery shortly again.
772            I.e. we deliberately leave the cluster with an inconsistent
773            generation id to allow us to abort recovery at any stage and
774            just restart it from scratch.
775          */
776         vnnmap->generation = generation;
777         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
778         if (ret != 0) {
779                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
780                 return -1;
781         }
782
783         /* get a list of all databases */
784         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
785         if (ret != 0) {
786                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
787                 return -1;
788         }
789
790
791
792         /* verify that all other nodes have all our databases */
793         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
794         if (ret != 0) {
795                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
796                 return -1;
797         }
798
799         /* verify that we have all the databases any other node has */
800         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
801         if (ret != 0) {
802                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
803                 return -1;
804         }
805
806
807
808         /* verify that all other nodes have all our databases */
809         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
810         if (ret != 0) {
811                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
812                 return -1;
813         }
814
815
816         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
817
818         /* pull all remote databases onto the local node */
819         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
820         if (ret != 0) {
821                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
822                 return -1;
823         }
824
825         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
826
827         /* push all local databases to the remote nodes */
828         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
829         if (ret != 0) {
830                 DEBUG(0, (__location__ " Unable to push local databases\n"));
831                 return -1;
832         }
833
834         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
835
836         /* build a new vnn map with all the currently active and
837            unbanned nodes */
838         generation = new_generation();
839         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
840         CTDB_NO_MEMORY(ctdb, vnnmap);
841         vnnmap->generation = generation;
842         vnnmap->size = num_active;
843         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
844         for (i=j=0;i<nodemap->num;i++) {
845                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
846                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
847                 }
848         }
849
850
851
852         /* update to the new vnnmap on all nodes */
853         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
854         if (ret != 0) {
855                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
856                 return -1;
857         }
858
859         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
860
861         /* update recmaster to point to us for all nodes */
862         ret = set_recovery_master(ctdb, nodemap, pnn);
863         if (ret!=0) {
864                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
865                 return -1;
866         }
867
868         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
869
870         /* repoint all local and remote database records to the local
871            node as being dmaster
872          */
873         ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
874         if (ret != 0) {
875                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
876                 return -1;
877         }
878
879         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
880
881         /*
882           update all nodes to have the same flags that we have
883          */
884         ret = update_flags_on_all_nodes(ctdb, nodemap);
885         if (ret != 0) {
886                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
887                 return -1;
888         }
889         
890         DEBUG(1, (__location__ " Recovery - updated flags\n"));
891
892         /*
893           run a vacuum operation on empty records
894          */
895         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
896         if (ret != 0) {
897                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
898                 return -1;
899         }
900
901         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
902
903         /*
904           if enabled, tell nodes to takeover their public IPs
905          */
906         if (ctdb->vnn) {
907                 ret = ctdb_takeover_run(ctdb, nodemap);
908                 if (ret != 0) {
909                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
910                         return -1;
911                 }
912                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
913         }
914
915
916         /* disable recovery mode */
917         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
918         if (ret!=0) {
919                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
920                 return -1;
921         }
922
923         /* send a message to all clients telling them that the cluster 
924            has been reconfigured */
925         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
926
927         DEBUG(0, (__location__ " Recovery complete\n"));
928
929         /* We just finished a recovery successfully. 
930            We now wait for rerecovery_timeout before we allow 
931            another recovery to take place.
932         */
933         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
934         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
935         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
936
937         return 0;
938 }
939
940
941 /*
942   elections are won by first checking the number of connected nodes, then
943   the priority time, then the pnn
944  */
945 struct election_message {
946         uint32_t num_connected;
947         struct timeval priority_time;
948         uint32_t pnn;
949 };
950
951 /*
952   form this nodes election data
953  */
954 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
955 {
956         int ret, i;
957         struct ctdb_node_map *nodemap;
958         struct ctdb_context *ctdb = rec->ctdb;
959
960         ZERO_STRUCTP(em);
961
962         em->pnn = rec->ctdb->pnn;
963         em->priority_time = rec->priority_time;
964
965         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
966         if (ret != 0) {
967                 return;
968         }
969
970         for (i=0;i<nodemap->num;i++) {
971                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
972                         em->num_connected++;
973                 }
974         }
975         talloc_free(nodemap);
976 }
977
978 /*
979   see if the given election data wins
980  */
981 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
982 {
983         struct election_message myem;
984         int cmp;
985
986         ctdb_election_data(rec, &myem);
987
988         /* try to use the most connected node */
989         cmp = (int)myem.num_connected - (int)em->num_connected;
990
991         /* then the longest running node */
992         if (cmp == 0) {
993                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
994         }
995
996         if (cmp == 0) {
997                 cmp = (int)myem.pnn - (int)em->pnn;
998         }
999
1000         return cmp > 0;
1001 }
1002
1003 /*
1004   send out an election request
1005  */
1006 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
1007 {
1008         int ret;
1009         TDB_DATA election_data;
1010         struct election_message emsg;
1011         uint64_t srvid;
1012         struct ctdb_context *ctdb = rec->ctdb;
1013         
1014         srvid = CTDB_SRVID_RECOVERY;
1015
1016         ctdb_election_data(rec, &emsg);
1017
1018         election_data.dsize = sizeof(struct election_message);
1019         election_data.dptr  = (unsigned char *)&emsg;
1020
1021
1022         /* first we assume we will win the election and set 
1023            recoverymaster to be ourself on the current node
1024          */
1025         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1026         if (ret != 0) {
1027                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1028                 return -1;
1029         }
1030
1031
1032         /* send an election message to all active nodes */
1033         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1034
1035         return 0;
1036 }
1037
1038 /*
1039   this function will unban all nodes in the cluster
1040 */
1041 static void unban_all_nodes(struct ctdb_context *ctdb)
1042 {
1043         int ret, i;
1044         struct ctdb_node_map *nodemap;
1045         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1046         
1047         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1048         if (ret != 0) {
1049                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1050                 return;
1051         }
1052
1053         for (i=0;i<nodemap->num;i++) {
1054                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1055                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1056                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1057                 }
1058         }
1059
1060         talloc_free(tmp_ctx);
1061 }
1062
1063 /*
1064   handler for recovery master elections
1065 */
1066 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1067                              TDB_DATA data, void *private_data)
1068 {
1069         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1070         int ret;
1071         struct election_message *em = (struct election_message *)data.dptr;
1072         TALLOC_CTX *mem_ctx;
1073
1074         mem_ctx = talloc_new(ctdb);
1075
1076         /* someone called an election. check their election data
1077            and if we disagree and we would rather be the elected node, 
1078            send a new election message to all other nodes
1079          */
1080         if (ctdb_election_win(rec, em)) {
1081                 ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
1082                 if (ret!=0) {
1083                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
1084                 }
1085                 talloc_free(mem_ctx);
1086                 /*unban_all_nodes(ctdb);*/
1087                 return;
1088         }
1089
1090         /* release the recmaster lock */
1091         if (em->pnn != ctdb->pnn &&
1092             ctdb->recovery_lock_fd != -1) {
1093                 close(ctdb->recovery_lock_fd);
1094                 ctdb->recovery_lock_fd = -1;
1095                 unban_all_nodes(ctdb);
1096         }
1097
1098         /* ok, let that guy become recmaster then */
1099         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1100         if (ret != 0) {
1101                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1102                 talloc_free(mem_ctx);
1103                 return;
1104         }
1105
1106         /* release any bans */
1107         rec->last_culprit = (uint32_t)-1;
1108         talloc_free(rec->banned_nodes);
1109         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1110         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1111
1112         talloc_free(mem_ctx);
1113         return;
1114 }
1115
1116
1117 /*
1118   force the start of the election process
1119  */
1120 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1121                            struct ctdb_node_map *nodemap)
1122 {
1123         int ret;
1124         struct ctdb_context *ctdb = rec->ctdb;
1125
1126         /* set all nodes to recovery mode to stop all internode traffic */
1127         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1128         if (ret!=0) {
1129                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1130                 return;
1131         }
1132         
1133         ret = send_election_request(rec, mem_ctx, pnn);
1134         if (ret!=0) {
1135                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1136                 return;
1137         }
1138
1139         /* wait for a few seconds to collect all responses */
1140         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1141 }
1142
1143
1144
1145 /*
1146   handler for when a node changes its flags
1147 */
1148 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1149                             TDB_DATA data, void *private_data)
1150 {
1151         int ret;
1152         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1153         struct ctdb_node_map *nodemap=NULL;
1154         TALLOC_CTX *tmp_ctx;
1155         uint32_t changed_flags;
1156         int i;
1157
1158         if (data.dsize != sizeof(*c)) {
1159                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1160                 return;
1161         }
1162
1163         tmp_ctx = talloc_new(ctdb);
1164         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1165
1166         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1167
1168         for (i=0;i<nodemap->num;i++) {
1169                 if (nodemap->nodes[i].pnn == c->pnn) break;
1170         }
1171
1172         if (i == nodemap->num) {
1173                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1174                 talloc_free(tmp_ctx);
1175                 return;
1176         }
1177
1178         changed_flags = c->old_flags ^ c->new_flags;
1179
1180         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1181            This flag is handled locally based on whether the local node
1182            can communicate with the node or not.
1183         */
1184         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1185         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1186                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1187         }
1188
1189         if (nodemap->nodes[i].flags != c->new_flags) {
1190                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1191         }
1192
1193         nodemap->nodes[i].flags = c->new_flags;
1194
1195         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1196                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1197
1198         if (ret == 0) {
1199                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1200                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1201         }
1202         
1203         if (ret == 0 &&
1204             ctdb->recovery_master == ctdb->pnn &&
1205             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1206             ctdb->vnn) {
1207                 /* Only do the takeover run if the perm disabled or unhealthy
1208                    flags changed since these will cause an ip failover but not
1209                    a recovery.
1210                    If the node became disconnected or banned this will also
1211                    lead to an ip address failover but that is handled 
1212                    during recovery
1213                 */
1214                 if (changed_flags & NODE_FLAGS_DISABLED) {
1215                         ret = ctdb_takeover_run(ctdb, nodemap);
1216                         if (ret != 0) {
1217                                 DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
1218                                 ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), 
1219                                                      ctdb->pnn, CTDB_RECOVERY_ACTIVE);
1220                         }
1221                         /* send a message to all clients telling them that the 
1222                            cluster has been reconfigured */
1223                         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1224                 }
1225         }
1226
1227         talloc_free(tmp_ctx);
1228 }
1229
1230
1231
1232 struct verify_recmode_normal_data {
1233         uint32_t count;
1234         enum monitor_result status;
1235 };
1236
1237 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1238 {
1239         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private, struct verify_recmode_normal_data);
1240
1241
1242         /* one more node has responded with recmode data*/
1243         rmdata->count--;
1244
1245         /* if we failed to get the recmode, then return an error and let
1246            the main loop try again.
1247         */
1248         if (state->state != CTDB_CONTROL_DONE) {
1249                 if (rmdata->status == MONITOR_OK) {
1250                         rmdata->status = MONITOR_FAILED;
1251                 }
1252                 return;
1253         }
1254
1255         /* if we got a response, then the recmode will be stored in the
1256            status field
1257         */
1258         if (state->status != CTDB_RECOVERY_NORMAL) {
1259                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1260                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1261         }
1262
1263         return;
1264 }
1265
1266
1267 /* verify that all nodes are in normal recovery mode */
1268 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1269 {
1270         struct verify_recmode_normal_data *rmdata;
1271         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1272         struct ctdb_client_control_state *state;
1273         enum monitor_result status;
1274         int j;
1275         
1276         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1277         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1278         rmdata->count  = 0;
1279         rmdata->status = MONITOR_OK;
1280
1281         /* loop over all active nodes and send an async getrecmode call to 
1282            them*/
1283         for (j=0; j<nodemap->num; j++) {
1284                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1285                         continue;
1286                 }
1287                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1288                                         CONTROL_TIMEOUT(), 
1289                                         nodemap->nodes[j].pnn);
1290                 if (state == NULL) {
1291                         /* we failed to send the control, treat this as 
1292                            an error and try again next iteration
1293                         */                      
1294                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1295                         talloc_free(mem_ctx);
1296                         return MONITOR_FAILED;
1297                 }
1298
1299                 /* set up the callback functions */
1300                 state->async.fn = verify_recmode_normal_callback;
1301                 state->async.private = rmdata;
1302
1303                 /* one more control to wait for to complete */
1304                 rmdata->count++;
1305         }
1306
1307
1308         /* now wait for up to the maximum number of seconds allowed
1309            or until all nodes we expect a response from has replied
1310         */
1311         while (rmdata->count > 0) {
1312                 event_loop_once(ctdb->ev);
1313         }
1314
1315         status = rmdata->status;
1316         talloc_free(mem_ctx);
1317         return status;
1318 }
1319
1320
1321 struct verify_recmaster_data {
1322         uint32_t count;
1323         uint32_t pnn;
1324         enum monitor_result status;
1325 };
1326
1327 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1328 {
1329         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private, struct verify_recmaster_data);
1330
1331
1332         /* one more node has responded with recmaster data*/
1333         rmdata->count--;
1334
1335         /* if we failed to get the recmaster, then return an error and let
1336            the main loop try again.
1337         */
1338         if (state->state != CTDB_CONTROL_DONE) {
1339                 if (rmdata->status == MONITOR_OK) {
1340                         rmdata->status = MONITOR_FAILED;
1341                 }
1342                 return;
1343         }
1344
1345         /* if we got a response, then the recmaster will be stored in the
1346            status field
1347         */
1348         if (state->status != rmdata->pnn) {
1349                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1350                 rmdata->status = MONITOR_ELECTION_NEEDED;
1351         }
1352
1353         return;
1354 }
1355
1356
1357 /* verify that all nodes agree that we are the recmaster */
1358 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1359 {
1360         struct verify_recmaster_data *rmdata;
1361         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1362         struct ctdb_client_control_state *state;
1363         enum monitor_result status;
1364         int j;
1365         
1366         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1367         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1368         rmdata->count  = 0;
1369         rmdata->pnn    = pnn;
1370         rmdata->status = MONITOR_OK;
1371
1372         /* loop over all active nodes and send an async getrecmaster call to 
1373            them*/
1374         for (j=0; j<nodemap->num; j++) {
1375                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1376                         continue;
1377                 }
1378                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1379                                         CONTROL_TIMEOUT(),
1380                                         nodemap->nodes[j].pnn);
1381                 if (state == NULL) {
1382                         /* we failed to send the control, treat this as 
1383                            an error and try again next iteration
1384                         */                      
1385                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1386                         talloc_free(mem_ctx);
1387                         return MONITOR_FAILED;
1388                 }
1389
1390                 /* set up the callback functions */
1391                 state->async.fn = verify_recmaster_callback;
1392                 state->async.private = rmdata;
1393
1394                 /* one more control to wait for to complete */
1395                 rmdata->count++;
1396         }
1397
1398
1399         /* now wait for up to the maximum number of seconds allowed
1400            or until all nodes we expect a response from has replied
1401         */
1402         while (rmdata->count > 0) {
1403                 event_loop_once(ctdb->ev);
1404         }
1405
1406         status = rmdata->status;
1407         talloc_free(mem_ctx);
1408         return status;
1409 }
1410
1411
1412 /*
1413   the main monitoring loop
1414  */
1415 static void monitor_cluster(struct ctdb_context *ctdb)
1416 {
1417         uint32_t pnn, num_active, recmaster;
1418         TALLOC_CTX *mem_ctx=NULL;
1419         struct ctdb_node_map *nodemap=NULL;
1420         struct ctdb_node_map *remote_nodemap=NULL;
1421         struct ctdb_vnn_map *vnnmap=NULL;
1422         struct ctdb_vnn_map *remote_vnnmap=NULL;
1423         int i, j, ret;
1424         bool need_takeover_run;
1425         struct ctdb_recoverd *rec;
1426
1427         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1428         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1429
1430         rec->ctdb = ctdb;
1431         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1432         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1433
1434         rec->priority_time = timeval_current();
1435
1436         /* register a message port for recovery elections */
1437         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1438
1439         /* and one for when nodes are disabled/enabled */
1440         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1441
1442         /* and one for when nodes are banned */
1443         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1444
1445         /* and one for when nodes are unbanned */
1446         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1447         
1448 again:
1449         need_takeover_run = false;
1450
1451         if (mem_ctx) {
1452                 talloc_free(mem_ctx);
1453                 mem_ctx = NULL;
1454         }
1455         mem_ctx = talloc_new(ctdb);
1456         if (!mem_ctx) {
1457                 DEBUG(0,("Failed to create temporary context\n"));
1458                 exit(-1);
1459         }
1460
1461         /* we only check for recovery once every second */
1462         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1463
1464         /* get relevant tunables */
1465         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1466         if (ret != 0) {
1467                 DEBUG(0,("Failed to get tunables - retrying\n"));
1468                 goto again;
1469         }
1470
1471         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1472         if (pnn == (uint32_t)-1) {
1473                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1474                 goto again;
1475         }
1476
1477         /* get the vnnmap */
1478         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1479         if (ret != 0) {
1480                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1481                 goto again;
1482         }
1483
1484
1485         /* get number of nodes */
1486         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1487         if (ret != 0) {
1488                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1489                 goto again;
1490         }
1491
1492
1493         /* count how many active nodes there are */
1494         num_active = 0;
1495         for (i=0; i<nodemap->num; i++) {
1496                 if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
1497                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1498                 } else {
1499                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1500                 }
1501                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1502                         num_active++;
1503                 }
1504         }
1505
1506
1507         /* check which node is the recovery master */
1508         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1509         if (ret != 0) {
1510                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1511                 goto again;
1512         }
1513
1514         if (recmaster == (uint32_t)-1) {
1515                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1516                 force_election(rec, mem_ctx, pnn, nodemap);
1517                 goto again;
1518         }
1519         
1520         /* verify that the recmaster node is still active */
1521         for (j=0; j<nodemap->num; j++) {
1522                 if (nodemap->nodes[j].pnn==recmaster) {
1523                         break;
1524                 }
1525         }
1526
1527         if (j == nodemap->num) {
1528                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1529                 force_election(rec, mem_ctx, pnn, nodemap);
1530                 goto again;
1531         }
1532
1533         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1534                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1535                 force_election(rec, mem_ctx, pnn, nodemap);
1536                 goto again;
1537         }
1538         
1539
1540         /* if we are not the recmaster then we do not need to check
1541            if recovery is needed
1542          */
1543         if (pnn!=recmaster) {
1544                 goto again;
1545         }
1546
1547
1548         /* update the list of public ips that a node can handle for
1549            all connected nodes
1550         */
1551         for (j=0; j<nodemap->num; j++) {
1552                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1553                         continue;
1554                 }
1555                 /* release any existing data */
1556                 if (ctdb->nodes[j]->public_ips) {
1557                         talloc_free(ctdb->nodes[j]->public_ips);
1558                         ctdb->nodes[j]->public_ips = NULL;
1559                 }
1560                 /* grab a new shiny list of public ips from the node */
1561                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1562                         ctdb->nodes[j]->pnn, 
1563                         ctdb->nodes,
1564                         &ctdb->nodes[j]->public_ips)) {
1565                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1566                                 ctdb->nodes[j]->pnn));
1567                         goto again;
1568                 }
1569         }
1570
1571
1572         /* verify that all active nodes agree that we are the recmaster */
1573         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1574         case MONITOR_RECOVERY_NEEDED:
1575                 /* can not happen */
1576                 goto again;
1577         case MONITOR_ELECTION_NEEDED:
1578                 force_election(rec, mem_ctx, pnn, nodemap);
1579                 goto again;
1580         case MONITOR_OK:
1581                 break;
1582         case MONITOR_FAILED:
1583                 goto again;
1584         }
1585
1586
1587         /* verify that all active nodes are in normal mode 
1588            and not in recovery mode 
1589          */
1590         switch (verify_recmode(ctdb, nodemap)) {
1591         case MONITOR_RECOVERY_NEEDED:
1592                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1593                 goto again;
1594         case MONITOR_FAILED:
1595                 goto again;
1596         case MONITOR_ELECTION_NEEDED:
1597                 /* can not happen */
1598         case MONITOR_OK:
1599                 break;
1600         }
1601
1602
1603
1604         /* get the nodemap for all active remote nodes and verify
1605            they are the same as for this node
1606          */
1607         for (j=0; j<nodemap->num; j++) {
1608                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1609                         continue;
1610                 }
1611                 if (nodemap->nodes[j].pnn == pnn) {
1612                         continue;
1613                 }
1614
1615                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1616                                            mem_ctx, &remote_nodemap);
1617                 if (ret != 0) {
1618                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1619                                   nodemap->nodes[j].pnn));
1620                         goto again;
1621                 }
1622
1623                 /* if the nodes disagree on how many nodes there are
1624                    then this is a good reason to try recovery
1625                  */
1626                 if (remote_nodemap->num != nodemap->num) {
1627                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1628                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1629                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1630                         goto again;
1631                 }
1632
1633                 /* if the nodes disagree on which nodes exist and are
1634                    active, then that is also a good reason to do recovery
1635                  */
1636                 for (i=0;i<nodemap->num;i++) {
1637                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
1638                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
1639                                           nodemap->nodes[j].pnn, i, 
1640                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
1641                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1642                                             vnnmap, nodemap->nodes[j].pnn);
1643                                 goto again;
1644                         }
1645                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1646                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1647                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1648                                           nodemap->nodes[j].pnn, i,
1649                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1650                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1651                                             vnnmap, nodemap->nodes[j].pnn);
1652                                 goto again;
1653                         }
1654                 }
1655
1656                 /* update our nodemap flags according to the other
1657                    server - this gets the NODE_FLAGS_DISABLED
1658                    flag. Note that the remote node is authoritative
1659                    for its flags (except CONNECTED, which we know
1660                    matches in this code) */
1661                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1662                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1663                         need_takeover_run = true;
1664                 }
1665         }
1666
1667
1668         /* there better be the same number of lmasters in the vnn map
1669            as there are active nodes or we will have to do a recovery
1670          */
1671         if (vnnmap->size != num_active) {
1672                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1673                           vnnmap->size, num_active));
1674                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1675                 goto again;
1676         }
1677
1678         /* verify that all active nodes in the nodemap also exist in 
1679            the vnnmap.
1680          */
1681         for (j=0; j<nodemap->num; j++) {
1682                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1683                         continue;
1684                 }
1685                 if (nodemap->nodes[j].pnn == pnn) {
1686                         continue;
1687                 }
1688
1689                 for (i=0; i<vnnmap->size; i++) {
1690                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
1691                                 break;
1692                         }
1693                 }
1694                 if (i == vnnmap->size) {
1695                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1696                                   nodemap->nodes[j].pnn));
1697                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1698                         goto again;
1699                 }
1700         }
1701
1702         
1703         /* verify that all other nodes have the same vnnmap
1704            and are from the same generation
1705          */
1706         for (j=0; j<nodemap->num; j++) {
1707                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1708                         continue;
1709                 }
1710                 if (nodemap->nodes[j].pnn == pnn) {
1711                         continue;
1712                 }
1713
1714                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1715                                           mem_ctx, &remote_vnnmap);
1716                 if (ret != 0) {
1717                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1718                                   nodemap->nodes[j].pnn));
1719                         goto again;
1720                 }
1721
1722                 /* verify the vnnmap generation is the same */
1723                 if (vnnmap->generation != remote_vnnmap->generation) {
1724                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1725                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
1726                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1727                         goto again;
1728                 }
1729
1730                 /* verify the vnnmap size is the same */
1731                 if (vnnmap->size != remote_vnnmap->size) {
1732                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1733                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
1734                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1735                         goto again;
1736                 }
1737
1738                 /* verify the vnnmap is the same */
1739                 for (i=0;i<vnnmap->size;i++) {
1740                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1741                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1742                                           nodemap->nodes[j].pnn));
1743                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1744                                             vnnmap, nodemap->nodes[j].pnn);
1745                                 goto again;
1746                         }
1747                 }
1748         }
1749
1750         /* we might need to change who has what IP assigned */
1751         if (need_takeover_run && ctdb->vnn) {
1752                 ret = ctdb_takeover_run(ctdb, nodemap);
1753                 if (ret != 0) {
1754                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
1755                         ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), 
1756                                              ctdb->pnn, CTDB_RECOVERY_ACTIVE);
1757                 }
1758         }
1759
1760         goto again;
1761
1762 }
1763
1764 /*
1765   event handler for when the main ctdbd dies
1766  */
1767 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1768                                  uint16_t flags, void *private_data)
1769 {
1770         DEBUG(0,("recovery daemon parent died - exiting\n"));
1771         _exit(1);
1772 }
1773
1774
1775
1776 /*
1777   startup the recovery daemon as a child of the main ctdb daemon
1778  */
1779 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1780 {
1781         int ret;
1782         int fd[2];
1783         pid_t child;
1784
1785         if (pipe(fd) != 0) {
1786                 return -1;
1787         }
1788
1789         child = fork();
1790         if (child == -1) {
1791                 return -1;
1792         }
1793         
1794         if (child != 0) {
1795                 close(fd[0]);
1796                 return 0;
1797         }
1798
1799         close(fd[1]);
1800
1801         /* shutdown the transport */
1802         ctdb->methods->shutdown(ctdb);
1803
1804         /* get a new event context */
1805         talloc_free(ctdb->ev);
1806         ctdb->ev = event_context_init(ctdb);
1807
1808         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1809                      ctdb_recoverd_parent, &fd[0]);     
1810
1811         close(ctdb->daemon.sd);
1812         ctdb->daemon.sd = -1;
1813
1814         srandom(getpid() ^ time(NULL));
1815
1816         /* initialise ctdb */
1817         ret = ctdb_socket_connect(ctdb);
1818         if (ret != 0) {
1819                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1820                 exit(1);
1821         }
1822
1823         monitor_cluster(ctdb);
1824
1825         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1826         return -1;
1827 }