merge from tridge
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "popt.h"
25 #include "cmdline.h"
26 #include "../include/ctdb.h"
27 #include "../include/ctdb_private.h"
28
29
30 struct ban_state {
31         struct ctdb_recoverd *rec;
32         uint32_t banned_node;
33 };
34
35 /*
36   private state of recovery daemon
37  */
38 struct ctdb_recoverd {
39         struct ctdb_context *ctdb;
40         uint32_t last_culprit;
41         uint32_t culprit_counter;
42         struct timeval first_recover_time;
43         struct ban_state **banned_nodes;
44         struct timeval priority_time;
45 };
46
47 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
48 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
49
50 /*
51   unban a node
52  */
53 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
54 {
55         struct ctdb_context *ctdb = rec->ctdb;
56
57         if (!ctdb_validate_pnn(ctdb, pnn)) {
58                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
59                 return;
60         }
61
62         if (rec->banned_nodes[pnn] == NULL) {
63                 return;
64         }
65
66         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
67
68         talloc_free(rec->banned_nodes[pnn]);
69         rec->banned_nodes[pnn] = NULL;
70 }
71
72
73 /*
74   called when a ban has timed out
75  */
76 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
77 {
78         struct ban_state *state = talloc_get_type(p, struct ban_state);
79         struct ctdb_recoverd *rec = state->rec;
80         uint32_t pnn = state->banned_node;
81
82         DEBUG(0,("Node %u is now unbanned\n", pnn));
83         ctdb_unban_node(rec, pnn);
84 }
85
86 /*
87   ban a node for a period of time
88  */
89 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
90 {
91         struct ctdb_context *ctdb = rec->ctdb;
92
93         if (!ctdb_validate_pnn(ctdb, pnn)) {
94                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
95                 return;
96         }
97
98         if (pnn == ctdb->pnn) {
99                 DEBUG(0,("self ban - lowering our election priority\n"));
100                 /* banning ourselves - lower our election priority */
101                 rec->priority_time = timeval_current();
102         }
103
104         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
105
106         rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
107         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
108
109         rec->banned_nodes[pnn]->rec = rec;
110         rec->banned_nodes[pnn]->banned_node = pnn;
111
112         if (ban_time != 0) {
113                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
114                                 timeval_current_ofs(ban_time, 0),
115                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
116         }
117 }
118
119 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
120
121
122 struct freeze_node_data {
123         uint32_t count;
124         enum monitor_result status;
125 };
126
127
128 static void freeze_node_callback(struct ctdb_client_control_state *state)
129 {
130         struct freeze_node_data *fndata = talloc_get_type(state->async.private, struct freeze_node_data);
131
132
133         /* one more node has responded to our freeze node*/
134         fndata->count--;
135
136         /* if we failed to freeze the node, we must trigger another recovery */
137         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
138                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
139                 fndata->status = MONITOR_RECOVERY_NEEDED;
140         }
141
142         return;
143 }
144
145
146
147 /* freeze all nodes */
148 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
149 {
150         struct freeze_node_data *fndata;
151         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
152         struct ctdb_client_control_state *state;
153         enum monitor_result status;
154         int j;
155         
156         fndata = talloc(mem_ctx, struct freeze_node_data);
157         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
158         fndata->count  = 0;
159         fndata->status = MONITOR_OK;
160
161         /* loop over all active nodes and send an async freeze call to 
162            them*/
163         for (j=0; j<nodemap->num; j++) {
164                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
165                         continue;
166                 }
167                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
168                                         CONTROL_TIMEOUT(), 
169                                         nodemap->nodes[j].pnn);
170                 if (state == NULL) {
171                         /* we failed to send the control, treat this as 
172                            an error and try again next iteration
173                         */                      
174                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
175                         talloc_free(mem_ctx);
176                         return MONITOR_RECOVERY_NEEDED;
177                 }
178
179                 /* set up the callback functions */
180                 state->async.fn = freeze_node_callback;
181                 state->async.private = fndata;
182
183                 /* one more control to wait for to complete */
184                 fndata->count++;
185         }
186
187
188         /* now wait for up to the maximum number of seconds allowed
189            or until all nodes we expect a response from has replied
190         */
191         while (fndata->count > 0) {
192                 event_loop_once(ctdb->ev);
193         }
194
195         status = fndata->status;
196         talloc_free(mem_ctx);
197         return status;
198 }
199
200
201 /*
202   change recovery mode on all nodes
203  */
204 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
205 {
206         int j, ret;
207
208         /* freeze all nodes */
209         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
210                 ret = freeze_all_nodes(ctdb, nodemap);
211                 if (ret != MONITOR_OK) {
212                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
213                         return -1;
214                 }
215         }
216
217
218         /* set recovery mode to active on all nodes */
219         for (j=0; j<nodemap->num; j++) {
220                 /* dont change it for nodes that are unavailable */
221                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
222                         continue;
223                 }
224
225                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
226                 if (ret != 0) {
227                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
228                         return -1;
229                 }
230
231                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
232                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
233                         if (ret != 0) {
234                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
235                                 return -1;
236                         }
237                 }
238         }
239
240         return 0;
241 }
242
243 /*
244   change recovery master on all node
245  */
246 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
247 {
248         int j, ret;
249
250         /* set recovery master to pnn on all nodes */
251         for (j=0; j<nodemap->num; j++) {
252                 /* dont change it for nodes that are unavailable */
253                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
254                         continue;
255                 }
256
257                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
258                 if (ret != 0) {
259                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
260                         return -1;
261                 }
262         }
263
264         return 0;
265 }
266
267
268 /*
269   ensure all other nodes have attached to any databases that we have
270  */
271 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
272                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
273 {
274         int i, j, db, ret;
275         struct ctdb_dbid_map *remote_dbmap;
276
277         /* verify that all other nodes have all our databases */
278         for (j=0; j<nodemap->num; j++) {
279                 /* we dont need to ourself ourselves */
280                 if (nodemap->nodes[j].pnn == pnn) {
281                         continue;
282                 }
283                 /* dont check nodes that are unavailable */
284                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
285                         continue;
286                 }
287
288                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
289                                          mem_ctx, &remote_dbmap);
290                 if (ret != 0) {
291                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
292                         return -1;
293                 }
294
295                 /* step through all local databases */
296                 for (db=0; db<dbmap->num;db++) {
297                         const char *name;
298
299
300                         for (i=0;i<remote_dbmap->num;i++) {
301                                 if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
302                                         break;
303                                 }
304                         }
305                         /* the remote node already have this database */
306                         if (i!=remote_dbmap->num) {
307                                 continue;
308                         }
309                         /* ok so we need to create this database */
310                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbids[db], mem_ctx, &name);
311                         if (ret != 0) {
312                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
313                                 return -1;
314                         }
315                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, name);
316                         if (ret != 0) {
317                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
318                                 return -1;
319                         }
320                 }
321         }
322
323         return 0;
324 }
325
326
327 /*
328   ensure we are attached to any databases that anyone else is attached to
329  */
330 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
331                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
332 {
333         int i, j, db, ret;
334         struct ctdb_dbid_map *remote_dbmap;
335
336         /* verify that we have all database any other node has */
337         for (j=0; j<nodemap->num; j++) {
338                 /* we dont need to ourself ourselves */
339                 if (nodemap->nodes[j].pnn == pnn) {
340                         continue;
341                 }
342                 /* dont check nodes that are unavailable */
343                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
344                         continue;
345                 }
346
347                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
348                                          mem_ctx, &remote_dbmap);
349                 if (ret != 0) {
350                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
351                         return -1;
352                 }
353
354                 /* step through all databases on the remote node */
355                 for (db=0; db<remote_dbmap->num;db++) {
356                         const char *name;
357
358                         for (i=0;i<(*dbmap)->num;i++) {
359                                 if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
360                                         break;
361                                 }
362                         }
363                         /* we already have this db locally */
364                         if (i!=(*dbmap)->num) {
365                                 continue;
366                         }
367                         /* ok so we need to create this database and
368                            rebuild dbmap
369                          */
370                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
371                                             remote_dbmap->dbids[db], mem_ctx, &name);
372                         if (ret != 0) {
373                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
374                                           nodemap->nodes[j].pnn));
375                                 return -1;
376                         }
377                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name);
378                         if (ret != 0) {
379                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
380                                 return -1;
381                         }
382                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
383                         if (ret != 0) {
384                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
385                                 return -1;
386                         }
387                 }
388         }
389
390         return 0;
391 }
392
393
394 /*
395   pull all the remote database contents into ours
396  */
397 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
398                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
399 {
400         int i, j, ret;
401
402         /* pull all records from all other nodes across onto this node
403            (this merges based on rsn)
404         */
405         for (i=0;i<dbmap->num;i++) {
406                 for (j=0; j<nodemap->num; j++) {
407                         /* we dont need to merge with ourselves */
408                         if (nodemap->nodes[j].pnn == pnn) {
409                                 continue;
410                         }
411                         /* dont merge from nodes that are unavailable */
412                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
413                                 continue;
414                         }
415                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
416                                                pnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
417                         if (ret != 0) {
418                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
419                                           nodemap->nodes[j].pnn, pnn));
420                                 return -1;
421                         }
422                 }
423         }
424
425         return 0;
426 }
427
428
429 /*
430   change the dmaster on all databases to point to us
431  */
432 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
433                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
434 {
435         int i, j, ret;
436
437         /* update dmaster to point to this node for all databases/nodes */
438         for (i=0;i<dbmap->num;i++) {
439                 for (j=0; j<nodemap->num; j++) {
440                         /* dont repoint nodes that are unavailable */
441                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
442                                 continue;
443                         }
444                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, ctdb, dbmap->dbids[i], pnn);
445                         if (ret != 0) {
446                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].pnn, dbmap->dbids[i]));
447                                 return -1;
448                         }
449                 }
450         }
451
452         return 0;
453 }
454
455
456 /*
457   update flags on all active nodes
458  */
459 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
460 {
461         int i;
462         for (i=0;i<nodemap->num;i++) {
463                 struct ctdb_node_flag_change c;
464                 TDB_DATA data;
465
466                 c.pnn = nodemap->nodes[i].pnn;
467                 c.old_flags = nodemap->nodes[i].flags;
468                 c.new_flags = nodemap->nodes[i].flags;
469
470                 data.dptr = (uint8_t *)&c;
471                 data.dsize = sizeof(c);
472
473                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
474                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
475
476         }
477         return 0;
478 }
479
480 /*
481   vacuum one database
482  */
483 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
484 {
485         uint64_t max_rsn;
486         int ret, i;
487
488         /* find max rsn on our local node for this db */
489         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
490         if (ret != 0) {
491                 return -1;
492         }
493
494         /* set rsn on non-empty records to max_rsn+1 */
495         for (i=0;i<nodemap->num;i++) {
496                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
497                         continue;
498                 }
499                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
500                                                  db_id, max_rsn+1);
501                 if (ret != 0) {
502                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
503                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
504                         return -1;
505                 }
506         }
507
508         /* delete records with rsn < max_rsn+1 on all nodes */
509         for (i=0;i<nodemap->num;i++) {
510                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
511                         continue;
512                 }
513                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
514                                                  db_id, max_rsn+1);
515                 if (ret != 0) {
516                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
517                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
518                         return -1;
519                 }
520         }
521
522
523         return 0;
524 }
525
526
527 /*
528   vacuum all attached databases
529  */
530 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
531                                 struct ctdb_dbid_map *dbmap)
532 {
533         int i;
534
535         /* update dmaster to point to this node for all databases/nodes */
536         for (i=0;i<dbmap->num;i++) {
537                 if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
538                         return -1;
539                 }
540         }
541         return 0;
542 }
543
544
545 /*
546   push out all our database contents to all other nodes
547  */
548 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
549                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
550 {
551         int i, j, ret;
552
553         /* push all records out to the nodes again */
554         for (i=0;i<dbmap->num;i++) {
555                 for (j=0; j<nodemap->num; j++) {
556                         /* we dont need to push to ourselves */
557                         if (nodemap->nodes[j].pnn == pnn) {
558                                 continue;
559                         }
560                         /* dont push to nodes that are unavailable */
561                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
562                                 continue;
563                         }
564                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
565                                                dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
566                         if (ret != 0) {
567                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
568                                           pnn, nodemap->nodes[j].pnn));
569                                 return -1;
570                         }
571                 }
572         }
573
574         return 0;
575 }
576
577
578 /*
579   ensure all nodes have the same vnnmap we do
580  */
581 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
582                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
583 {
584         int j, ret;
585
586         /* push the new vnn map out to all the nodes */
587         for (j=0; j<nodemap->num; j++) {
588                 /* dont push to nodes that are unavailable */
589                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
590                         continue;
591                 }
592
593                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
594                 if (ret != 0) {
595                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
596                         return -1;
597                 }
598         }
599
600         return 0;
601 }
602
603
604 /*
605   handler for when the admin bans a node
606 */
607 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
608                         TDB_DATA data, void *private_data)
609 {
610         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
611         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
612         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
613         uint32_t recmaster;
614         int ret;
615
616         if (data.dsize != sizeof(*b)) {
617                 DEBUG(0,("Bad data in ban_handler\n"));
618                 talloc_free(mem_ctx);
619                 return;
620         }
621
622         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
623         if (ret != 0) {
624                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
625                 talloc_free(mem_ctx);
626                 return;
627         }
628
629         if (recmaster != ctdb->pnn) {
630                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
631                 talloc_free(mem_ctx);
632                 return;
633         }
634
635         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
636                  b->pnn, b->ban_time));
637         ctdb_ban_node(rec, b->pnn, b->ban_time);
638         talloc_free(mem_ctx);
639 }
640
641 /*
642   handler for when the admin unbans a node
643 */
644 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
645                           TDB_DATA data, void *private_data)
646 {
647         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
648         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
649         uint32_t pnn;
650         int ret;
651         uint32_t recmaster;
652
653         if (data.dsize != sizeof(uint32_t)) {
654                 DEBUG(0,("Bad data in unban_handler\n"));
655                 talloc_free(mem_ctx);
656                 return;
657         }
658         pnn = *(uint32_t *)data.dptr;
659
660         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
661         if (ret != 0) {
662                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
663                 talloc_free(mem_ctx);
664                 return;
665         }
666
667         if (recmaster != ctdb->pnn) {
668                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
669                 talloc_free(mem_ctx);
670                 return;
671         }
672
673         DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
674         ctdb_unban_node(rec, pnn);
675         talloc_free(mem_ctx);
676 }
677
678
679
680 /*
681   called when ctdb_wait_timeout should finish
682  */
683 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
684                               struct timeval yt, void *p)
685 {
686         uint32_t *timed_out = (uint32_t *)p;
687         (*timed_out) = 1;
688 }
689
690 /*
691   wait for a given number of seconds
692  */
693 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
694 {
695         uint32_t timed_out = 0;
696         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
697         while (!timed_out) {
698                 event_loop_once(ctdb->ev);
699         }
700 }
701
702 /* Create a new random generation ip. 
703    The generation id can not be the INVALID_GENERATION id
704 */
705 static uint32_t new_generation(void)
706 {
707         uint32_t generation;
708
709         while (1) {
710                 generation = random();
711
712                 if (generation != INVALID_GENERATION) {
713                         break;
714                 }
715         }
716
717         return generation;
718 }
719                 
720 /*
721   we are the recmaster, and recovery is needed - start a recovery run
722  */
723 static int do_recovery(struct ctdb_recoverd *rec, 
724                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
725                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
726                        uint32_t culprit)
727 {
728         struct ctdb_context *ctdb = rec->ctdb;
729         int i, j, ret;
730         uint32_t generation;
731         struct ctdb_dbid_map *dbmap;
732
733         if (rec->last_culprit != culprit ||
734             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
735                 /* either a new node is the culprit, or we've decide to forgive them */
736                 rec->last_culprit = culprit;
737                 rec->first_recover_time = timeval_current();
738                 rec->culprit_counter = 0;
739         }
740         rec->culprit_counter++;
741
742         if (rec->culprit_counter > 2*nodemap->num) {
743                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
744                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
745                          ctdb->tunable.recovery_ban_period));
746                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
747         }
748
749         if (!ctdb_recovery_lock(ctdb, true)) {
750                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
751                 return -1;
752         }
753
754         /* set recovery mode to active on all nodes */
755         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
756         if (ret!=0) {
757                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
758                 return -1;
759         }
760
761         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
762
763         /* pick a new generation number */
764         generation = new_generation();
765
766         /* change the vnnmap on this node to use the new generation 
767            number but not on any other nodes.
768            this guarantees that if we abort the recovery prematurely
769            for some reason (a node stops responding?)
770            that we can just return immediately and we will reenter
771            recovery shortly again.
772            I.e. we deliberately leave the cluster with an inconsistent
773            generation id to allow us to abort recovery at any stage and
774            just restart it from scratch.
775          */
776         vnnmap->generation = generation;
777         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
778         if (ret != 0) {
779                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
780                 return -1;
781         }
782
783         /* get a list of all databases */
784         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
785         if (ret != 0) {
786                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
787                 return -1;
788         }
789
790
791
792         /* verify that all other nodes have all our databases */
793         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
794         if (ret != 0) {
795                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
796                 return -1;
797         }
798
799         /* verify that we have all the databases any other node has */
800         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
801         if (ret != 0) {
802                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
803                 return -1;
804         }
805
806
807
808         /* verify that all other nodes have all our databases */
809         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
810         if (ret != 0) {
811                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
812                 return -1;
813         }
814
815
816         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
817
818         /* pull all remote databases onto the local node */
819         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
820         if (ret != 0) {
821                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
822                 return -1;
823         }
824
825         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
826
827         /* push all local databases to the remote nodes */
828         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
829         if (ret != 0) {
830                 DEBUG(0, (__location__ " Unable to push local databases\n"));
831                 return -1;
832         }
833
834         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
835
836         /* build a new vnn map with all the currently active and
837            unbanned nodes */
838         generation = new_generation();
839         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
840         CTDB_NO_MEMORY(ctdb, vnnmap);
841         vnnmap->generation = generation;
842         vnnmap->size = num_active;
843         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
844         for (i=j=0;i<nodemap->num;i++) {
845                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
846                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
847                 }
848         }
849
850
851
852         /* update to the new vnnmap on all nodes */
853         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
854         if (ret != 0) {
855                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
856                 return -1;
857         }
858
859         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
860
861         /* update recmaster to point to us for all nodes */
862         ret = set_recovery_master(ctdb, nodemap, pnn);
863         if (ret!=0) {
864                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
865                 return -1;
866         }
867
868         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
869
870         /* repoint all local and remote database records to the local
871            node as being dmaster
872          */
873         ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
874         if (ret != 0) {
875                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
876                 return -1;
877         }
878
879         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
880
881         /*
882           update all nodes to have the same flags that we have
883          */
884         ret = update_flags_on_all_nodes(ctdb, nodemap);
885         if (ret != 0) {
886                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
887                 return -1;
888         }
889         
890         DEBUG(1, (__location__ " Recovery - updated flags\n"));
891
892         /*
893           run a vacuum operation on empty records
894          */
895         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
896         if (ret != 0) {
897                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
898                 return -1;
899         }
900
901         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
902
903         /*
904           if enabled, tell nodes to takeover their public IPs
905          */
906         if (ctdb->vnn) {
907                 ret = ctdb_takeover_run(ctdb, nodemap);
908                 if (ret != 0) {
909                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
910                         return -1;
911                 }
912                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
913         }
914
915
916         /* disable recovery mode */
917         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
918         if (ret!=0) {
919                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
920                 return -1;
921         }
922
923         /* send a message to all clients telling them that the cluster 
924            has been reconfigured */
925         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
926
927         DEBUG(0, (__location__ " Recovery complete\n"));
928
929         /* We just finished a recovery successfully. 
930            We now wait for rerecovery_timeout before we allow 
931            another recovery to take place.
932         */
933         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
934         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
935         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
936
937         return 0;
938 }
939
940
941 /*
942   elections are won by first checking the number of connected nodes, then
943   the priority time, then the pnn
944  */
945 struct election_message {
946         uint32_t num_connected;
947         struct timeval priority_time;
948         uint32_t pnn;
949 };
950
951 /*
952   form this nodes election data
953  */
954 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
955 {
956         int ret, i;
957         struct ctdb_node_map *nodemap;
958         struct ctdb_context *ctdb = rec->ctdb;
959
960         ZERO_STRUCTP(em);
961
962         em->pnn = rec->ctdb->pnn;
963         em->priority_time = rec->priority_time;
964
965         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
966         if (ret != 0) {
967                 return;
968         }
969
970         for (i=0;i<nodemap->num;i++) {
971                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
972                         em->num_connected++;
973                 }
974         }
975         talloc_free(nodemap);
976 }
977
978 /*
979   see if the given election data wins
980  */
981 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
982 {
983         struct election_message myem;
984         int cmp;
985
986         ctdb_election_data(rec, &myem);
987
988         /* try to use the most connected node */
989         cmp = (int)myem.num_connected - (int)em->num_connected;
990
991         /* then the longest running node */
992         if (cmp == 0) {
993                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
994         }
995
996         if (cmp == 0) {
997                 cmp = (int)myem.pnn - (int)em->pnn;
998         }
999
1000         return cmp > 0;
1001 }
1002
1003 /*
1004   send out an election request
1005  */
1006 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
1007 {
1008         int ret;
1009         TDB_DATA election_data;
1010         struct election_message emsg;
1011         uint64_t srvid;
1012         struct ctdb_context *ctdb = rec->ctdb;
1013         
1014         srvid = CTDB_SRVID_RECOVERY;
1015
1016         ctdb_election_data(rec, &emsg);
1017
1018         election_data.dsize = sizeof(struct election_message);
1019         election_data.dptr  = (unsigned char *)&emsg;
1020
1021
1022         /* first we assume we will win the election and set 
1023            recoverymaster to be ourself on the current node
1024          */
1025         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1026         if (ret != 0) {
1027                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1028                 return -1;
1029         }
1030
1031
1032         /* send an election message to all active nodes */
1033         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1034
1035         return 0;
1036 }
1037
1038 /*
1039   this function will unban all nodes in the cluster
1040 */
1041 static void unban_all_nodes(struct ctdb_context *ctdb)
1042 {
1043         int ret, i;
1044         struct ctdb_node_map *nodemap;
1045         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1046         
1047         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1048         if (ret != 0) {
1049                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1050                 return;
1051         }
1052
1053         for (i=0;i<nodemap->num;i++) {
1054                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1055                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1056                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1057                 }
1058         }
1059
1060         talloc_free(tmp_ctx);
1061 }
1062
1063 /*
1064   handler for recovery master elections
1065 */
1066 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1067                              TDB_DATA data, void *private_data)
1068 {
1069         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1070         int ret;
1071         struct election_message *em = (struct election_message *)data.dptr;
1072         TALLOC_CTX *mem_ctx;
1073
1074         mem_ctx = talloc_new(ctdb);
1075
1076         /* someone called an election. check their election data
1077            and if we disagree and we would rather be the elected node, 
1078            send a new election message to all other nodes
1079          */
1080         if (ctdb_election_win(rec, em)) {
1081                 ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
1082                 if (ret!=0) {
1083                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
1084                 }
1085                 talloc_free(mem_ctx);
1086                 /*unban_all_nodes(ctdb);*/
1087                 return;
1088         }
1089
1090         /* release the recmaster lock */
1091         if (em->pnn != ctdb->pnn &&
1092             ctdb->recovery_lock_fd != -1) {
1093                 close(ctdb->recovery_lock_fd);
1094                 ctdb->recovery_lock_fd = -1;
1095                 unban_all_nodes(ctdb);
1096         }
1097
1098         /* ok, let that guy become recmaster then */
1099         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1100         if (ret != 0) {
1101                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1102                 talloc_free(mem_ctx);
1103                 return;
1104         }
1105
1106         /* release any bans */
1107         rec->last_culprit = (uint32_t)-1;
1108         talloc_free(rec->banned_nodes);
1109         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1110         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1111
1112         talloc_free(mem_ctx);
1113         return;
1114 }
1115
1116
1117 /*
1118   force the start of the election process
1119  */
1120 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1121                            struct ctdb_node_map *nodemap)
1122 {
1123         int ret;
1124         struct ctdb_context *ctdb = rec->ctdb;
1125
1126         /* set all nodes to recovery mode to stop all internode traffic */
1127         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1128         if (ret!=0) {
1129                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1130                 return;
1131         }
1132         
1133         ret = send_election_request(rec, mem_ctx, pnn);
1134         if (ret!=0) {
1135                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1136                 return;
1137         }
1138
1139         /* wait for a few seconds to collect all responses */
1140         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1141 }
1142
1143
1144
1145 /*
1146   handler for when a node changes its flags
1147 */
1148 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1149                             TDB_DATA data, void *private_data)
1150 {
1151         int ret;
1152         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1153         struct ctdb_node_map *nodemap=NULL;
1154         TALLOC_CTX *tmp_ctx;
1155         uint32_t changed_flags;
1156         int i;
1157
1158         if (data.dsize != sizeof(*c)) {
1159                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1160                 return;
1161         }
1162
1163         tmp_ctx = talloc_new(ctdb);
1164         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1165
1166         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1167
1168         for (i=0;i<nodemap->num;i++) {
1169                 if (nodemap->nodes[i].pnn == c->pnn) break;
1170         }
1171
1172         if (i == nodemap->num) {
1173                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1174                 talloc_free(tmp_ctx);
1175                 return;
1176         }
1177
1178         changed_flags = c->old_flags ^ c->new_flags;
1179
1180         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1181            This flag is handled locally based on whether the local node
1182            can communicate with the node or not.
1183         */
1184         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1185         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1186                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1187         }
1188
1189         if (nodemap->nodes[i].flags != c->new_flags) {
1190                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1191         }
1192
1193         nodemap->nodes[i].flags = c->new_flags;
1194
1195         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1196                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1197
1198         if (ret == 0) {
1199                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1200                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1201         }
1202         
1203         if (ret == 0 &&
1204             ctdb->recovery_master == ctdb->pnn &&
1205             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1206             ctdb->vnn) {
1207                 /* Only do the takeover run if the perm disabled or unhealthy
1208                    flags changed since these will cause an ip failover but not
1209                    a recovery.
1210                    If the node became disconnected or banned this will also
1211                    lead to an ip address failover but that is handled 
1212                    during recovery
1213                 */
1214                 if (changed_flags & NODE_FLAGS_DISABLED) {
1215                         ret = ctdb_takeover_run(ctdb, nodemap);
1216                         if (ret != 0) {
1217                                 DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1218                         }
1219                         /* send a message to all clients telling them that the 
1220                            cluster has been reconfigured */
1221                         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1222                 }
1223         }
1224
1225         talloc_free(tmp_ctx);
1226 }
1227
1228
1229
1230 struct verify_recmode_normal_data {
1231         uint32_t count;
1232         enum monitor_result status;
1233 };
1234
1235 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1236 {
1237         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private, struct verify_recmode_normal_data);
1238
1239
1240         /* one more node has responded with recmode data*/
1241         rmdata->count--;
1242
1243         /* if we failed to get the recmode, then return an error and let
1244            the main loop try again.
1245         */
1246         if (state->state != CTDB_CONTROL_DONE) {
1247                 if (rmdata->status == MONITOR_OK) {
1248                         rmdata->status = MONITOR_FAILED;
1249                 }
1250                 return;
1251         }
1252
1253         /* if we got a response, then the recmode will be stored in the
1254            status field
1255         */
1256         if (state->status != CTDB_RECOVERY_NORMAL) {
1257                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1258                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1259         }
1260
1261         return;
1262 }
1263
1264
1265 /* verify that all nodes are in normal recovery mode */
1266 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1267 {
1268         struct verify_recmode_normal_data *rmdata;
1269         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1270         struct ctdb_client_control_state *state;
1271         enum monitor_result status;
1272         int j;
1273         
1274         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1275         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1276         rmdata->count  = 0;
1277         rmdata->status = MONITOR_OK;
1278
1279         /* loop over all active nodes and send an async getrecmode call to 
1280            them*/
1281         for (j=0; j<nodemap->num; j++) {
1282                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1283                         continue;
1284                 }
1285                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1286                                         CONTROL_TIMEOUT(), 
1287                                         nodemap->nodes[j].pnn);
1288                 if (state == NULL) {
1289                         /* we failed to send the control, treat this as 
1290                            an error and try again next iteration
1291                         */                      
1292                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1293                         talloc_free(mem_ctx);
1294                         return MONITOR_FAILED;
1295                 }
1296
1297                 /* set up the callback functions */
1298                 state->async.fn = verify_recmode_normal_callback;
1299                 state->async.private = rmdata;
1300
1301                 /* one more control to wait for to complete */
1302                 rmdata->count++;
1303         }
1304
1305
1306         /* now wait for up to the maximum number of seconds allowed
1307            or until all nodes we expect a response from has replied
1308         */
1309         while (rmdata->count > 0) {
1310                 event_loop_once(ctdb->ev);
1311         }
1312
1313         status = rmdata->status;
1314         talloc_free(mem_ctx);
1315         return status;
1316 }
1317
1318
1319 struct verify_recmaster_data {
1320         uint32_t count;
1321         uint32_t pnn;
1322         enum monitor_result status;
1323 };
1324
1325 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1326 {
1327         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private, struct verify_recmaster_data);
1328
1329
1330         /* one more node has responded with recmaster data*/
1331         rmdata->count--;
1332
1333         /* if we failed to get the recmaster, then return an error and let
1334            the main loop try again.
1335         */
1336         if (state->state != CTDB_CONTROL_DONE) {
1337                 if (rmdata->status == MONITOR_OK) {
1338                         rmdata->status = MONITOR_FAILED;
1339                 }
1340                 return;
1341         }
1342
1343         /* if we got a response, then the recmaster will be stored in the
1344            status field
1345         */
1346         if (state->status != rmdata->pnn) {
1347                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1348                 rmdata->status = MONITOR_ELECTION_NEEDED;
1349         }
1350
1351         return;
1352 }
1353
1354
1355 /* verify that all nodes agree that we are the recmaster */
1356 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1357 {
1358         struct verify_recmaster_data *rmdata;
1359         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1360         struct ctdb_client_control_state *state;
1361         enum monitor_result status;
1362         int j;
1363         
1364         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1365         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1366         rmdata->count  = 0;
1367         rmdata->pnn    = pnn;
1368         rmdata->status = MONITOR_OK;
1369
1370         /* loop over all active nodes and send an async getrecmaster call to 
1371            them*/
1372         for (j=0; j<nodemap->num; j++) {
1373                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1374                         continue;
1375                 }
1376                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1377                                         CONTROL_TIMEOUT(),
1378                                         nodemap->nodes[j].pnn);
1379                 if (state == NULL) {
1380                         /* we failed to send the control, treat this as 
1381                            an error and try again next iteration
1382                         */                      
1383                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1384                         talloc_free(mem_ctx);
1385                         return MONITOR_FAILED;
1386                 }
1387
1388                 /* set up the callback functions */
1389                 state->async.fn = verify_recmaster_callback;
1390                 state->async.private = rmdata;
1391
1392                 /* one more control to wait for to complete */
1393                 rmdata->count++;
1394         }
1395
1396
1397         /* now wait for up to the maximum number of seconds allowed
1398            or until all nodes we expect a response from has replied
1399         */
1400         while (rmdata->count > 0) {
1401                 event_loop_once(ctdb->ev);
1402         }
1403
1404         status = rmdata->status;
1405         talloc_free(mem_ctx);
1406         return status;
1407 }
1408
1409
1410 /*
1411   the main monitoring loop
1412  */
1413 static void monitor_cluster(struct ctdb_context *ctdb)
1414 {
1415         uint32_t pnn, num_active, recmaster;
1416         TALLOC_CTX *mem_ctx=NULL;
1417         struct ctdb_node_map *nodemap=NULL;
1418         struct ctdb_node_map *remote_nodemap=NULL;
1419         struct ctdb_vnn_map *vnnmap=NULL;
1420         struct ctdb_vnn_map *remote_vnnmap=NULL;
1421         int i, j, ret;
1422         bool need_takeover_run;
1423         struct ctdb_recoverd *rec;
1424
1425         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1426         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1427
1428         rec->ctdb = ctdb;
1429         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1430         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1431
1432         rec->priority_time = timeval_current();
1433
1434         /* register a message port for recovery elections */
1435         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1436
1437         /* and one for when nodes are disabled/enabled */
1438         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1439
1440         /* and one for when nodes are banned */
1441         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1442
1443         /* and one for when nodes are unbanned */
1444         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1445         
1446 again:
1447         need_takeover_run = false;
1448
1449         if (mem_ctx) {
1450                 talloc_free(mem_ctx);
1451                 mem_ctx = NULL;
1452         }
1453         mem_ctx = talloc_new(ctdb);
1454         if (!mem_ctx) {
1455                 DEBUG(0,("Failed to create temporary context\n"));
1456                 exit(-1);
1457         }
1458
1459         /* we only check for recovery once every second */
1460         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1461
1462         /* get relevant tunables */
1463         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1464         if (ret != 0) {
1465                 DEBUG(0,("Failed to get tunables - retrying\n"));
1466                 goto again;
1467         }
1468
1469         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1470         if (pnn == (uint32_t)-1) {
1471                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1472                 goto again;
1473         }
1474
1475         /* get the vnnmap */
1476         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1477         if (ret != 0) {
1478                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1479                 goto again;
1480         }
1481
1482
1483         /* get number of nodes */
1484         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1485         if (ret != 0) {
1486                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1487                 goto again;
1488         }
1489
1490
1491         /* count how many active nodes there are */
1492         num_active = 0;
1493         for (i=0; i<nodemap->num; i++) {
1494                 if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
1495                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1496                 } else {
1497                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1498                 }
1499                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1500                         num_active++;
1501                 }
1502         }
1503
1504
1505         /* check which node is the recovery master */
1506         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1507         if (ret != 0) {
1508                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1509                 goto again;
1510         }
1511
1512         if (recmaster == (uint32_t)-1) {
1513                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1514                 force_election(rec, mem_ctx, pnn, nodemap);
1515                 goto again;
1516         }
1517         
1518         /* verify that the recmaster node is still active */
1519         for (j=0; j<nodemap->num; j++) {
1520                 if (nodemap->nodes[j].pnn==recmaster) {
1521                         break;
1522                 }
1523         }
1524
1525         if (j == nodemap->num) {
1526                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1527                 force_election(rec, mem_ctx, pnn, nodemap);
1528                 goto again;
1529         }
1530
1531         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1532                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1533                 force_election(rec, mem_ctx, pnn, nodemap);
1534                 goto again;
1535         }
1536         
1537
1538         /* if we are not the recmaster then we do not need to check
1539            if recovery is needed
1540          */
1541         if (pnn!=recmaster) {
1542                 goto again;
1543         }
1544
1545
1546         /* update the list of public ips that a node can handle for
1547            all connected nodes
1548         */
1549         for (j=0; j<nodemap->num; j++) {
1550                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1551                         continue;
1552                 }
1553                 /* release any existing data */
1554                 if (ctdb->nodes[j]->public_ips) {
1555                         talloc_free(ctdb->nodes[j]->public_ips);
1556                         ctdb->nodes[j]->public_ips = NULL;
1557                 }
1558                 /* grab a new shiny list of public ips from the node */
1559                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1560                         ctdb->nodes[j]->pnn, 
1561                         ctdb->nodes,
1562                         &ctdb->nodes[j]->public_ips)) {
1563                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1564                                 ctdb->nodes[j]->pnn));
1565                         goto again;
1566                 }
1567         }
1568
1569
1570         /* verify that all active nodes agree that we are the recmaster */
1571         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1572         case MONITOR_RECOVERY_NEEDED:
1573                 /* can not happen */
1574                 goto again;
1575         case MONITOR_ELECTION_NEEDED:
1576                 force_election(rec, mem_ctx, pnn, nodemap);
1577                 goto again;
1578         case MONITOR_OK:
1579                 break;
1580         case MONITOR_FAILED:
1581                 goto again;
1582         }
1583
1584
1585         /* verify that all active nodes are in normal mode 
1586            and not in recovery mode 
1587          */
1588         switch (verify_recmode(ctdb, nodemap)) {
1589         case MONITOR_RECOVERY_NEEDED:
1590                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1591                 goto again;
1592         case MONITOR_FAILED:
1593                 goto again;
1594         case MONITOR_ELECTION_NEEDED:
1595                 /* can not happen */
1596         case MONITOR_OK:
1597                 break;
1598         }
1599
1600
1601
1602         /* get the nodemap for all active remote nodes and verify
1603            they are the same as for this node
1604          */
1605         for (j=0; j<nodemap->num; j++) {
1606                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1607                         continue;
1608                 }
1609                 if (nodemap->nodes[j].pnn == pnn) {
1610                         continue;
1611                 }
1612
1613                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1614                                            mem_ctx, &remote_nodemap);
1615                 if (ret != 0) {
1616                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1617                                   nodemap->nodes[j].pnn));
1618                         goto again;
1619                 }
1620
1621                 /* if the nodes disagree on how many nodes there are
1622                    then this is a good reason to try recovery
1623                  */
1624                 if (remote_nodemap->num != nodemap->num) {
1625                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1626                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1627                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1628                         goto again;
1629                 }
1630
1631                 /* if the nodes disagree on which nodes exist and are
1632                    active, then that is also a good reason to do recovery
1633                  */
1634                 for (i=0;i<nodemap->num;i++) {
1635                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
1636                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
1637                                           nodemap->nodes[j].pnn, i, 
1638                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
1639                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1640                                             vnnmap, nodemap->nodes[j].pnn);
1641                                 goto again;
1642                         }
1643                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1644                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1645                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1646                                           nodemap->nodes[j].pnn, i,
1647                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1648                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1649                                             vnnmap, nodemap->nodes[j].pnn);
1650                                 goto again;
1651                         }
1652                 }
1653
1654                 /* update our nodemap flags according to the other
1655                    server - this gets the NODE_FLAGS_DISABLED
1656                    flag. Note that the remote node is authoritative
1657                    for its flags (except CONNECTED, which we know
1658                    matches in this code) */
1659                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1660                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1661                         need_takeover_run = true;
1662                 }
1663         }
1664
1665
1666         /* there better be the same number of lmasters in the vnn map
1667            as there are active nodes or we will have to do a recovery
1668          */
1669         if (vnnmap->size != num_active) {
1670                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1671                           vnnmap->size, num_active));
1672                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1673                 goto again;
1674         }
1675
1676         /* verify that all active nodes in the nodemap also exist in 
1677            the vnnmap.
1678          */
1679         for (j=0; j<nodemap->num; j++) {
1680                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1681                         continue;
1682                 }
1683                 if (nodemap->nodes[j].pnn == pnn) {
1684                         continue;
1685                 }
1686
1687                 for (i=0; i<vnnmap->size; i++) {
1688                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
1689                                 break;
1690                         }
1691                 }
1692                 if (i == vnnmap->size) {
1693                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1694                                   nodemap->nodes[j].pnn));
1695                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1696                         goto again;
1697                 }
1698         }
1699
1700         
1701         /* verify that all other nodes have the same vnnmap
1702            and are from the same generation
1703          */
1704         for (j=0; j<nodemap->num; j++) {
1705                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1706                         continue;
1707                 }
1708                 if (nodemap->nodes[j].pnn == pnn) {
1709                         continue;
1710                 }
1711
1712                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1713                                           mem_ctx, &remote_vnnmap);
1714                 if (ret != 0) {
1715                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1716                                   nodemap->nodes[j].pnn));
1717                         goto again;
1718                 }
1719
1720                 /* verify the vnnmap generation is the same */
1721                 if (vnnmap->generation != remote_vnnmap->generation) {
1722                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1723                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
1724                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1725                         goto again;
1726                 }
1727
1728                 /* verify the vnnmap size is the same */
1729                 if (vnnmap->size != remote_vnnmap->size) {
1730                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1731                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
1732                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1733                         goto again;
1734                 }
1735
1736                 /* verify the vnnmap is the same */
1737                 for (i=0;i<vnnmap->size;i++) {
1738                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1739                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1740                                           nodemap->nodes[j].pnn));
1741                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1742                                             vnnmap, nodemap->nodes[j].pnn);
1743                                 goto again;
1744                         }
1745                 }
1746         }
1747
1748         /* we might need to change who has what IP assigned */
1749         if (need_takeover_run && ctdb->vnn) {
1750                 ret = ctdb_takeover_run(ctdb, nodemap);
1751                 if (ret != 0) {
1752                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1753                 }
1754         }
1755
1756         goto again;
1757
1758 }
1759
1760 /*
1761   event handler for when the main ctdbd dies
1762  */
1763 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1764                                  uint16_t flags, void *private_data)
1765 {
1766         DEBUG(0,("recovery daemon parent died - exiting\n"));
1767         _exit(1);
1768 }
1769
1770
1771
1772 /*
1773   startup the recovery daemon as a child of the main ctdb daemon
1774  */
1775 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1776 {
1777         int ret;
1778         int fd[2];
1779         pid_t child;
1780
1781         if (pipe(fd) != 0) {
1782                 return -1;
1783         }
1784
1785         child = fork();
1786         if (child == -1) {
1787                 return -1;
1788         }
1789         
1790         if (child != 0) {
1791                 close(fd[0]);
1792                 return 0;
1793         }
1794
1795         close(fd[1]);
1796
1797         /* shutdown the transport */
1798         ctdb->methods->shutdown(ctdb);
1799
1800         /* get a new event context */
1801         talloc_free(ctdb->ev);
1802         ctdb->ev = event_context_init(ctdb);
1803
1804         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1805                      ctdb_recoverd_parent, &fd[0]);     
1806
1807         close(ctdb->daemon.sd);
1808         ctdb->daemon.sd = -1;
1809
1810         srandom(getpid() ^ time(NULL));
1811
1812         /* initialise ctdb */
1813         ret = ctdb_socket_connect(ctdb);
1814         if (ret != 0) {
1815                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1816                 exit(1);
1817         }
1818
1819         monitor_cluster(ctdb);
1820
1821         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1822         return -1;
1823 }