we are the culprit if we can't get the reclock
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb.h"
28 #include "../include/ctdb_private.h"
29
30
31 struct ban_state {
32         struct ctdb_recoverd *rec;
33         uint32_t banned_node;
34 };
35
36 /*
37   private state of recovery daemon
38  */
39 struct ctdb_recoverd {
40         struct ctdb_context *ctdb;
41         uint32_t last_culprit;
42         uint32_t culprit_counter;
43         struct timeval first_recover_time;
44         struct ban_state **banned_nodes;
45         struct timeval priority_time;
46         bool need_takeover_run;
47         bool need_recovery;
48 };
49
50 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
51 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
52
53 /*
54   unban a node
55  */
56 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
57 {
58         struct ctdb_context *ctdb = rec->ctdb;
59
60         if (!ctdb_validate_pnn(ctdb, pnn)) {
61                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
62                 return;
63         }
64
65         if (rec->banned_nodes[pnn] == NULL) {
66                 return;
67         }
68
69         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
70
71         talloc_free(rec->banned_nodes[pnn]);
72         rec->banned_nodes[pnn] = NULL;
73 }
74
75
76 /*
77   called when a ban has timed out
78  */
79 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
80 {
81         struct ban_state *state = talloc_get_type(p, struct ban_state);
82         struct ctdb_recoverd *rec = state->rec;
83         uint32_t pnn = state->banned_node;
84
85         DEBUG(0,("Node %u is now unbanned\n", pnn));
86         ctdb_unban_node(rec, pnn);
87 }
88
89 /*
90   ban a node for a period of time
91  */
92 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
93 {
94         struct ctdb_context *ctdb = rec->ctdb;
95
96         if (!ctdb_validate_pnn(ctdb, pnn)) {
97                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
98                 return;
99         }
100
101         if (pnn == ctdb->pnn) {
102                 DEBUG(0,("self ban - lowering our election priority\n"));
103                 /* banning ourselves - lower our election priority */
104                 rec->priority_time = timeval_current();
105         }
106
107         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
108
109         rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
110         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
111
112         rec->banned_nodes[pnn]->rec = rec;
113         rec->banned_nodes[pnn]->banned_node = pnn;
114
115         if (ban_time != 0) {
116                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
117                                 timeval_current_ofs(ban_time, 0),
118                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
119         }
120 }
121
122 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
123
124
125 struct freeze_node_data {
126         uint32_t count;
127         enum monitor_result status;
128 };
129
130
131 static void freeze_node_callback(struct ctdb_client_control_state *state)
132 {
133         struct freeze_node_data *fndata = talloc_get_type(state->async.private, struct freeze_node_data);
134
135
136         /* one more node has responded to our freeze node*/
137         fndata->count--;
138
139         /* if we failed to freeze the node, we must trigger another recovery */
140         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
141                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
142                 fndata->status = MONITOR_RECOVERY_NEEDED;
143         }
144
145         return;
146 }
147
148
149
150 /* freeze all nodes */
151 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
152 {
153         struct freeze_node_data *fndata;
154         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
155         struct ctdb_client_control_state *state;
156         enum monitor_result status;
157         int j;
158         
159         fndata = talloc(mem_ctx, struct freeze_node_data);
160         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
161         fndata->count  = 0;
162         fndata->status = MONITOR_OK;
163
164         /* loop over all active nodes and send an async freeze call to 
165            them*/
166         for (j=0; j<nodemap->num; j++) {
167                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
168                         continue;
169                 }
170                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
171                                         CONTROL_TIMEOUT(), 
172                                         nodemap->nodes[j].pnn);
173                 if (state == NULL) {
174                         /* we failed to send the control, treat this as 
175                            an error and try again next iteration
176                         */                      
177                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
178                         talloc_free(mem_ctx);
179                         return MONITOR_RECOVERY_NEEDED;
180                 }
181
182                 /* set up the callback functions */
183                 state->async.fn = freeze_node_callback;
184                 state->async.private = fndata;
185
186                 /* one more control to wait for to complete */
187                 fndata->count++;
188         }
189
190
191         /* now wait for up to the maximum number of seconds allowed
192            or until all nodes we expect a response from has replied
193         */
194         while (fndata->count > 0) {
195                 event_loop_once(ctdb->ev);
196         }
197
198         status = fndata->status;
199         talloc_free(mem_ctx);
200         return status;
201 }
202
203
204 /*
205   change recovery mode on all nodes
206  */
207 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
208 {
209         int j, ret;
210
211         /* freeze all nodes */
212         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
213                 ret = freeze_all_nodes(ctdb, nodemap);
214                 if (ret != MONITOR_OK) {
215                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
216                         return -1;
217                 }
218         }
219
220
221         /* set recovery mode to active on all nodes */
222         for (j=0; j<nodemap->num; j++) {
223                 /* dont change it for nodes that are unavailable */
224                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
225                         continue;
226                 }
227
228                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
229                 if (ret != 0) {
230                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
231                         return -1;
232                 }
233
234                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
235                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
236                         if (ret != 0) {
237                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
238                                 return -1;
239                         }
240                 }
241         }
242
243         return 0;
244 }
245
246 /*
247   change recovery master on all node
248  */
249 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
250 {
251         int j, ret;
252
253         /* set recovery master to pnn on all nodes */
254         for (j=0; j<nodemap->num; j++) {
255                 /* dont change it for nodes that are unavailable */
256                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
257                         continue;
258                 }
259
260                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
261                 if (ret != 0) {
262                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
263                         return -1;
264                 }
265         }
266
267         return 0;
268 }
269
270
271 /*
272   ensure all other nodes have attached to any databases that we have
273  */
274 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
275                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
276 {
277         int i, j, db, ret;
278         struct ctdb_dbid_map *remote_dbmap;
279
280         /* verify that all other nodes have all our databases */
281         for (j=0; j<nodemap->num; j++) {
282                 /* we dont need to ourself ourselves */
283                 if (nodemap->nodes[j].pnn == pnn) {
284                         continue;
285                 }
286                 /* dont check nodes that are unavailable */
287                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
288                         continue;
289                 }
290
291                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
292                                          mem_ctx, &remote_dbmap);
293                 if (ret != 0) {
294                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
295                         return -1;
296                 }
297
298                 /* step through all local databases */
299                 for (db=0; db<dbmap->num;db++) {
300                         const char *name;
301
302
303                         for (i=0;i<remote_dbmap->num;i++) {
304                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
305                                         break;
306                                 }
307                         }
308                         /* the remote node already have this database */
309                         if (i!=remote_dbmap->num) {
310                                 continue;
311                         }
312                         /* ok so we need to create this database */
313                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
314                                             mem_ctx, &name);
315                         if (ret != 0) {
316                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
317                                 return -1;
318                         }
319                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
320                                            mem_ctx, name, dbmap->dbs[db].persistent);
321                         if (ret != 0) {
322                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
323                                 return -1;
324                         }
325                 }
326         }
327
328         return 0;
329 }
330
331
332 /*
333   ensure we are attached to any databases that anyone else is attached to
334  */
335 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
336                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
337 {
338         int i, j, db, ret;
339         struct ctdb_dbid_map *remote_dbmap;
340
341         /* verify that we have all database any other node has */
342         for (j=0; j<nodemap->num; j++) {
343                 /* we dont need to ourself ourselves */
344                 if (nodemap->nodes[j].pnn == pnn) {
345                         continue;
346                 }
347                 /* dont check nodes that are unavailable */
348                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
349                         continue;
350                 }
351
352                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
353                                          mem_ctx, &remote_dbmap);
354                 if (ret != 0) {
355                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
356                         return -1;
357                 }
358
359                 /* step through all databases on the remote node */
360                 for (db=0; db<remote_dbmap->num;db++) {
361                         const char *name;
362
363                         for (i=0;i<(*dbmap)->num;i++) {
364                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
365                                         break;
366                                 }
367                         }
368                         /* we already have this db locally */
369                         if (i!=(*dbmap)->num) {
370                                 continue;
371                         }
372                         /* ok so we need to create this database and
373                            rebuild dbmap
374                          */
375                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
376                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
377                         if (ret != 0) {
378                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
379                                           nodemap->nodes[j].pnn));
380                                 return -1;
381                         }
382                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
383                                            remote_dbmap->dbs[db].persistent);
384                         if (ret != 0) {
385                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
386                                 return -1;
387                         }
388                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
389                         if (ret != 0) {
390                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
391                                 return -1;
392                         }
393                 }
394         }
395
396         return 0;
397 }
398
399
400 /*
401   pull all the remote database contents into ours
402  */
403 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
404                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
405 {
406         int i, j, ret;
407
408         /* pull all records from all other nodes across onto this node
409            (this merges based on rsn)
410         */
411         for (i=0;i<dbmap->num;i++) {
412                 for (j=0; j<nodemap->num; j++) {
413                         /* we dont need to merge with ourselves */
414                         if (nodemap->nodes[j].pnn == pnn) {
415                                 continue;
416                         }
417                         /* dont merge from nodes that are unavailable */
418                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
419                                 continue;
420                         }
421                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
422                                                pnn, dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
423                         if (ret != 0) {
424                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
425                                           nodemap->nodes[j].pnn, pnn));
426                                 return -1;
427                         }
428                 }
429         }
430
431         return 0;
432 }
433
434
435 /*
436   change the dmaster on all databases to point to us
437  */
438 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
439                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
440 {
441         int i, j, ret;
442
443         /* update dmaster to point to this node for all databases/nodes */
444         for (i=0;i<dbmap->num;i++) {
445                 for (j=0; j<nodemap->num; j++) {
446                         /* dont repoint nodes that are unavailable */
447                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
448                                 continue;
449                         }
450                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
451                                                    ctdb, dbmap->dbs[i].dbid, pnn);
452                         if (ret != 0) {
453                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", 
454                                           nodemap->nodes[j].pnn, dbmap->dbs[i].dbid));
455                                 return -1;
456                         }
457                 }
458         }
459
460         return 0;
461 }
462
463
464 /*
465   update flags on all active nodes
466  */
467 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
468 {
469         int i;
470         for (i=0;i<nodemap->num;i++) {
471                 struct ctdb_node_flag_change c;
472                 TDB_DATA data;
473
474                 c.pnn = nodemap->nodes[i].pnn;
475                 c.old_flags = nodemap->nodes[i].flags;
476                 c.new_flags = nodemap->nodes[i].flags;
477
478                 data.dptr = (uint8_t *)&c;
479                 data.dsize = sizeof(c);
480
481                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
482                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
483
484         }
485         return 0;
486 }
487
488 /*
489   vacuum one database
490  */
491 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
492 {
493         uint64_t max_rsn;
494         int ret, i;
495
496         /* find max rsn on our local node for this db */
497         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
498         if (ret != 0) {
499                 return -1;
500         }
501
502         /* set rsn on non-empty records to max_rsn+1 */
503         for (i=0;i<nodemap->num;i++) {
504                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
505                         continue;
506                 }
507                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
508                                                  db_id, max_rsn+1);
509                 if (ret != 0) {
510                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
511                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
512                         return -1;
513                 }
514         }
515
516         /* delete records with rsn < max_rsn+1 on all nodes */
517         for (i=0;i<nodemap->num;i++) {
518                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
519                         continue;
520                 }
521                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
522                                                  db_id, max_rsn+1);
523                 if (ret != 0) {
524                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
525                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
526                         return -1;
527                 }
528         }
529
530
531         return 0;
532 }
533
534
535 /*
536   vacuum all attached databases
537  */
538 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
539                                 struct ctdb_dbid_map *dbmap)
540 {
541         int i;
542
543         /* update dmaster to point to this node for all databases/nodes */
544         for (i=0;i<dbmap->num;i++) {
545                 if (vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap) != 0) {
546                         return -1;
547                 }
548         }
549         return 0;
550 }
551
552
553 /*
554   push out all our database contents to all other nodes
555  */
556 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
557                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
558 {
559         int i, j, ret;
560
561         /* push all records out to the nodes again */
562         for (i=0;i<dbmap->num;i++) {
563                 for (j=0; j<nodemap->num; j++) {
564                         /* we dont need to push to ourselves */
565                         if (nodemap->nodes[j].pnn == pnn) {
566                                 continue;
567                         }
568                         /* dont push to nodes that are unavailable */
569                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
570                                 continue;
571                         }
572                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
573                                                dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
574                         if (ret != 0) {
575                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
576                                           pnn, nodemap->nodes[j].pnn));
577                                 return -1;
578                         }
579                 }
580         }
581
582         return 0;
583 }
584
585
586 /*
587   ensure all nodes have the same vnnmap we do
588  */
589 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
590                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
591 {
592         int j, ret;
593
594         /* push the new vnn map out to all the nodes */
595         for (j=0; j<nodemap->num; j++) {
596                 /* dont push to nodes that are unavailable */
597                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
598                         continue;
599                 }
600
601                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
602                 if (ret != 0) {
603                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
604                         return -1;
605                 }
606         }
607
608         return 0;
609 }
610
611
612 /*
613   handler for when the admin bans a node
614 */
615 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
616                         TDB_DATA data, void *private_data)
617 {
618         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
619         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
620         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
621         uint32_t recmaster;
622         int ret;
623
624         if (data.dsize != sizeof(*b)) {
625                 DEBUG(0,("Bad data in ban_handler\n"));
626                 talloc_free(mem_ctx);
627                 return;
628         }
629
630         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
631         if (ret != 0) {
632                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
633                 talloc_free(mem_ctx);
634                 return;
635         }
636
637         if (recmaster != ctdb->pnn) {
638                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
639                 talloc_free(mem_ctx);
640                 return;
641         }
642
643         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
644                  b->pnn, b->ban_time));
645         ctdb_ban_node(rec, b->pnn, b->ban_time);
646         talloc_free(mem_ctx);
647 }
648
649 /*
650   handler for when the admin unbans a node
651 */
652 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
653                           TDB_DATA data, void *private_data)
654 {
655         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
656         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
657         uint32_t pnn;
658         int ret;
659         uint32_t recmaster;
660
661         if (data.dsize != sizeof(uint32_t)) {
662                 DEBUG(0,("Bad data in unban_handler\n"));
663                 talloc_free(mem_ctx);
664                 return;
665         }
666         pnn = *(uint32_t *)data.dptr;
667
668         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
669         if (ret != 0) {
670                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
671                 talloc_free(mem_ctx);
672                 return;
673         }
674
675         if (recmaster != ctdb->pnn) {
676                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
677                 talloc_free(mem_ctx);
678                 return;
679         }
680
681         DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
682         ctdb_unban_node(rec, pnn);
683         talloc_free(mem_ctx);
684 }
685
686
687
688 /*
689   called when ctdb_wait_timeout should finish
690  */
691 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
692                               struct timeval yt, void *p)
693 {
694         uint32_t *timed_out = (uint32_t *)p;
695         (*timed_out) = 1;
696 }
697
698 /*
699   wait for a given number of seconds
700  */
701 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
702 {
703         uint32_t timed_out = 0;
704         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
705         while (!timed_out) {
706                 event_loop_once(ctdb->ev);
707         }
708 }
709
710 /* Create a new random generation ip. 
711    The generation id can not be the INVALID_GENERATION id
712 */
713 static uint32_t new_generation(void)
714 {
715         uint32_t generation;
716
717         while (1) {
718                 generation = random();
719
720                 if (generation != INVALID_GENERATION) {
721                         break;
722                 }
723         }
724
725         return generation;
726 }
727
728 /*
729   remember the trouble maker
730  */
731 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
732 {
733         struct ctdb_context *ctdb = rec->ctdb;
734
735         if (rec->last_culprit != culprit ||
736             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
737                 /* either a new node is the culprit, or we've decide to forgive them */
738                 rec->last_culprit = culprit;
739                 rec->first_recover_time = timeval_current();
740                 rec->culprit_counter = 0;
741         }
742         rec->culprit_counter++;
743 }
744                 
745 /*
746   we are the recmaster, and recovery is needed - start a recovery run
747  */
748 static int do_recovery(struct ctdb_recoverd *rec, 
749                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
750                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
751                        uint32_t culprit)
752 {
753         struct ctdb_context *ctdb = rec->ctdb;
754         int i, j, ret;
755         uint32_t generation;
756         struct ctdb_dbid_map *dbmap;
757
758         /* if recovery fails, force it again */
759         rec->need_recovery = true;
760
761         ctdb_set_culprit(rec, culprit);
762
763         if (rec->culprit_counter > 2*nodemap->num) {
764                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
765                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
766                          ctdb->tunable.recovery_ban_period));
767                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
768         }
769
770         if (!ctdb_recovery_lock(ctdb, true)) {
771                 ctdb_set_culprit(rec, pnn);
772                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
773                 return -1;
774         }
775
776         /* set recovery mode to active on all nodes */
777         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
778         if (ret!=0) {
779                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
780                 return -1;
781         }
782
783         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
784
785         /* pick a new generation number */
786         generation = new_generation();
787
788         /* change the vnnmap on this node to use the new generation 
789            number but not on any other nodes.
790            this guarantees that if we abort the recovery prematurely
791            for some reason (a node stops responding?)
792            that we can just return immediately and we will reenter
793            recovery shortly again.
794            I.e. we deliberately leave the cluster with an inconsistent
795            generation id to allow us to abort recovery at any stage and
796            just restart it from scratch.
797          */
798         vnnmap->generation = generation;
799         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
800         if (ret != 0) {
801                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
802                 return -1;
803         }
804
805         /* get a list of all databases */
806         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
807         if (ret != 0) {
808                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
809                 return -1;
810         }
811
812
813
814         /* verify that all other nodes have all our databases */
815         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
816         if (ret != 0) {
817                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
818                 return -1;
819         }
820
821         /* verify that we have all the databases any other node has */
822         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
823         if (ret != 0) {
824                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
825                 return -1;
826         }
827
828
829
830         /* verify that all other nodes have all our databases */
831         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
832         if (ret != 0) {
833                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
834                 return -1;
835         }
836
837
838         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
839
840         /* pull all remote databases onto the local node */
841         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
842         if (ret != 0) {
843                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
844                 return -1;
845         }
846
847         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
848
849         /* push all local databases to the remote nodes */
850         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
851         if (ret != 0) {
852                 DEBUG(0, (__location__ " Unable to push local databases\n"));
853                 return -1;
854         }
855
856         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
857
858         /* build a new vnn map with all the currently active and
859            unbanned nodes */
860         generation = new_generation();
861         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
862         CTDB_NO_MEMORY(ctdb, vnnmap);
863         vnnmap->generation = generation;
864         vnnmap->size = num_active;
865         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
866         for (i=j=0;i<nodemap->num;i++) {
867                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
868                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
869                 }
870         }
871
872
873
874         /* update to the new vnnmap on all nodes */
875         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
876         if (ret != 0) {
877                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
878                 return -1;
879         }
880
881         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
882
883         /* update recmaster to point to us for all nodes */
884         ret = set_recovery_master(ctdb, nodemap, pnn);
885         if (ret!=0) {
886                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
887                 return -1;
888         }
889
890         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
891
892         /* repoint all local and remote database records to the local
893            node as being dmaster
894          */
895         ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
896         if (ret != 0) {
897                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
898                 return -1;
899         }
900
901         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
902
903         /*
904           update all nodes to have the same flags that we have
905          */
906         ret = update_flags_on_all_nodes(ctdb, nodemap);
907         if (ret != 0) {
908                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
909                 return -1;
910         }
911         
912         DEBUG(1, (__location__ " Recovery - updated flags\n"));
913
914         /*
915           run a vacuum operation on empty records
916          */
917         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
918         if (ret != 0) {
919                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
920                 return -1;
921         }
922
923         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
924
925         /*
926           if enabled, tell nodes to takeover their public IPs
927          */
928         if (ctdb->vnn) {
929                 rec->need_takeover_run = false;
930                 ret = ctdb_takeover_run(ctdb, nodemap);
931                 if (ret != 0) {
932                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
933                         return -1;
934                 }
935                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
936         }
937
938         for (i=0;i<dbmap->num;i++) {
939                 DEBUG(0,("Recovered database with db_id 0x%08x\n", dbmap->dbs[i].dbid));
940         }
941
942         /* disable recovery mode */
943         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
944         if (ret!=0) {
945                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
946                 return -1;
947         }
948
949         /* send a message to all clients telling them that the cluster 
950            has been reconfigured */
951         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
952
953         DEBUG(0, (__location__ " Recovery complete\n"));
954
955         rec->need_recovery = false;
956
957         /* We just finished a recovery successfully. 
958            We now wait for rerecovery_timeout before we allow 
959            another recovery to take place.
960         */
961         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
962         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
963         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
964
965         return 0;
966 }
967
968
969 /*
970   elections are won by first checking the number of connected nodes, then
971   the priority time, then the pnn
972  */
973 struct election_message {
974         uint32_t num_connected;
975         struct timeval priority_time;
976         uint32_t pnn;
977 };
978
979 /*
980   form this nodes election data
981  */
982 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
983 {
984         int ret, i;
985         struct ctdb_node_map *nodemap;
986         struct ctdb_context *ctdb = rec->ctdb;
987
988         ZERO_STRUCTP(em);
989
990         em->pnn = rec->ctdb->pnn;
991         em->priority_time = rec->priority_time;
992
993         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
994         if (ret != 0) {
995                 return;
996         }
997
998         for (i=0;i<nodemap->num;i++) {
999                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1000                         em->num_connected++;
1001                 }
1002         }
1003         talloc_free(nodemap);
1004 }
1005
1006 /*
1007   see if the given election data wins
1008  */
1009 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1010 {
1011         struct election_message myem;
1012         int cmp;
1013
1014         ctdb_election_data(rec, &myem);
1015
1016         /* try to use the most connected node */
1017         cmp = (int)myem.num_connected - (int)em->num_connected;
1018
1019         /* then the longest running node */
1020         if (cmp == 0) {
1021                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1022         }
1023
1024         if (cmp == 0) {
1025                 cmp = (int)myem.pnn - (int)em->pnn;
1026         }
1027
1028         return cmp > 0;
1029 }
1030
1031 /*
1032   send out an election request
1033  */
1034 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
1035 {
1036         int ret;
1037         TDB_DATA election_data;
1038         struct election_message emsg;
1039         uint64_t srvid;
1040         struct ctdb_context *ctdb = rec->ctdb;
1041         
1042         srvid = CTDB_SRVID_RECOVERY;
1043
1044         ctdb_election_data(rec, &emsg);
1045
1046         election_data.dsize = sizeof(struct election_message);
1047         election_data.dptr  = (unsigned char *)&emsg;
1048
1049
1050         /* first we assume we will win the election and set 
1051            recoverymaster to be ourself on the current node
1052          */
1053         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1054         if (ret != 0) {
1055                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1056                 return -1;
1057         }
1058
1059
1060         /* send an election message to all active nodes */
1061         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1062
1063         return 0;
1064 }
1065
1066 /*
1067   this function will unban all nodes in the cluster
1068 */
1069 static void unban_all_nodes(struct ctdb_context *ctdb)
1070 {
1071         int ret, i;
1072         struct ctdb_node_map *nodemap;
1073         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1074         
1075         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1076         if (ret != 0) {
1077                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1078                 return;
1079         }
1080
1081         for (i=0;i<nodemap->num;i++) {
1082                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1083                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1084                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1085                 }
1086         }
1087
1088         talloc_free(tmp_ctx);
1089 }
1090
1091 /*
1092   handler for recovery master elections
1093 */
1094 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1095                              TDB_DATA data, void *private_data)
1096 {
1097         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1098         int ret;
1099         struct election_message *em = (struct election_message *)data.dptr;
1100         TALLOC_CTX *mem_ctx;
1101
1102         mem_ctx = talloc_new(ctdb);
1103
1104         /* someone called an election. check their election data
1105            and if we disagree and we would rather be the elected node, 
1106            send a new election message to all other nodes
1107          */
1108         if (ctdb_election_win(rec, em)) {
1109                 ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
1110                 if (ret!=0) {
1111                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
1112                 }
1113                 talloc_free(mem_ctx);
1114                 /*unban_all_nodes(ctdb);*/
1115                 return;
1116         }
1117
1118         /* release the recmaster lock */
1119         if (em->pnn != ctdb->pnn &&
1120             ctdb->recovery_lock_fd != -1) {
1121                 close(ctdb->recovery_lock_fd);
1122                 ctdb->recovery_lock_fd = -1;
1123                 unban_all_nodes(ctdb);
1124         }
1125
1126         /* ok, let that guy become recmaster then */
1127         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1128         if (ret != 0) {
1129                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1130                 talloc_free(mem_ctx);
1131                 return;
1132         }
1133
1134         /* release any bans */
1135         rec->last_culprit = (uint32_t)-1;
1136         talloc_free(rec->banned_nodes);
1137         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1138         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1139
1140         talloc_free(mem_ctx);
1141         return;
1142 }
1143
1144
1145 /*
1146   force the start of the election process
1147  */
1148 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1149                            struct ctdb_node_map *nodemap)
1150 {
1151         int ret;
1152         struct ctdb_context *ctdb = rec->ctdb;
1153
1154         /* set all nodes to recovery mode to stop all internode traffic */
1155         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1156         if (ret!=0) {
1157                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1158                 return;
1159         }
1160         
1161         ret = send_election_request(rec, mem_ctx, pnn);
1162         if (ret!=0) {
1163                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1164                 return;
1165         }
1166
1167         /* wait for a few seconds to collect all responses */
1168         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1169 }
1170
1171
1172
1173 /*
1174   handler for when a node changes its flags
1175 */
1176 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1177                             TDB_DATA data, void *private_data)
1178 {
1179         int ret;
1180         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1181         struct ctdb_node_map *nodemap=NULL;
1182         TALLOC_CTX *tmp_ctx;
1183         uint32_t changed_flags;
1184         int i;
1185         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1186
1187         if (data.dsize != sizeof(*c)) {
1188                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1189                 return;
1190         }
1191
1192         tmp_ctx = talloc_new(ctdb);
1193         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1194
1195         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1196
1197         for (i=0;i<nodemap->num;i++) {
1198                 if (nodemap->nodes[i].pnn == c->pnn) break;
1199         }
1200
1201         if (i == nodemap->num) {
1202                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1203                 talloc_free(tmp_ctx);
1204                 return;
1205         }
1206
1207         changed_flags = c->old_flags ^ c->new_flags;
1208
1209         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1210            This flag is handled locally based on whether the local node
1211            can communicate with the node or not.
1212         */
1213         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1214         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1215                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1216         }
1217
1218         if (nodemap->nodes[i].flags != c->new_flags) {
1219                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1220         }
1221
1222         nodemap->nodes[i].flags = c->new_flags;
1223
1224         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1225                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1226
1227         if (ret == 0) {
1228                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1229                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1230         }
1231         
1232         if (ret == 0 &&
1233             ctdb->recovery_master == ctdb->pnn &&
1234             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1235             ctdb->vnn) {
1236                 /* Only do the takeover run if the perm disabled or unhealthy
1237                    flags changed since these will cause an ip failover but not
1238                    a recovery.
1239                    If the node became disconnected or banned this will also
1240                    lead to an ip address failover but that is handled 
1241                    during recovery
1242                 */
1243                 if (changed_flags & NODE_FLAGS_DISABLED) {
1244                         rec->need_takeover_run = true;
1245                 }
1246         }
1247
1248         talloc_free(tmp_ctx);
1249 }
1250
1251
1252
1253 struct verify_recmode_normal_data {
1254         uint32_t count;
1255         enum monitor_result status;
1256 };
1257
1258 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1259 {
1260         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private, struct verify_recmode_normal_data);
1261
1262
1263         /* one more node has responded with recmode data*/
1264         rmdata->count--;
1265
1266         /* if we failed to get the recmode, then return an error and let
1267            the main loop try again.
1268         */
1269         if (state->state != CTDB_CONTROL_DONE) {
1270                 if (rmdata->status == MONITOR_OK) {
1271                         rmdata->status = MONITOR_FAILED;
1272                 }
1273                 return;
1274         }
1275
1276         /* if we got a response, then the recmode will be stored in the
1277            status field
1278         */
1279         if (state->status != CTDB_RECOVERY_NORMAL) {
1280                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1281                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1282         }
1283
1284         return;
1285 }
1286
1287
1288 /* verify that all nodes are in normal recovery mode */
1289 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1290 {
1291         struct verify_recmode_normal_data *rmdata;
1292         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1293         struct ctdb_client_control_state *state;
1294         enum monitor_result status;
1295         int j;
1296         
1297         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1298         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1299         rmdata->count  = 0;
1300         rmdata->status = MONITOR_OK;
1301
1302         /* loop over all active nodes and send an async getrecmode call to 
1303            them*/
1304         for (j=0; j<nodemap->num; j++) {
1305                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1306                         continue;
1307                 }
1308                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1309                                         CONTROL_TIMEOUT(), 
1310                                         nodemap->nodes[j].pnn);
1311                 if (state == NULL) {
1312                         /* we failed to send the control, treat this as 
1313                            an error and try again next iteration
1314                         */                      
1315                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1316                         talloc_free(mem_ctx);
1317                         return MONITOR_FAILED;
1318                 }
1319
1320                 /* set up the callback functions */
1321                 state->async.fn = verify_recmode_normal_callback;
1322                 state->async.private = rmdata;
1323
1324                 /* one more control to wait for to complete */
1325                 rmdata->count++;
1326         }
1327
1328
1329         /* now wait for up to the maximum number of seconds allowed
1330            or until all nodes we expect a response from has replied
1331         */
1332         while (rmdata->count > 0) {
1333                 event_loop_once(ctdb->ev);
1334         }
1335
1336         status = rmdata->status;
1337         talloc_free(mem_ctx);
1338         return status;
1339 }
1340
1341
1342 struct verify_recmaster_data {
1343         uint32_t count;
1344         uint32_t pnn;
1345         enum monitor_result status;
1346 };
1347
1348 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1349 {
1350         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private, struct verify_recmaster_data);
1351
1352
1353         /* one more node has responded with recmaster data*/
1354         rmdata->count--;
1355
1356         /* if we failed to get the recmaster, then return an error and let
1357            the main loop try again.
1358         */
1359         if (state->state != CTDB_CONTROL_DONE) {
1360                 if (rmdata->status == MONITOR_OK) {
1361                         rmdata->status = MONITOR_FAILED;
1362                 }
1363                 return;
1364         }
1365
1366         /* if we got a response, then the recmaster will be stored in the
1367            status field
1368         */
1369         if (state->status != rmdata->pnn) {
1370                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1371                 rmdata->status = MONITOR_ELECTION_NEEDED;
1372         }
1373
1374         return;
1375 }
1376
1377
1378 /* verify that all nodes agree that we are the recmaster */
1379 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1380 {
1381         struct verify_recmaster_data *rmdata;
1382         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1383         struct ctdb_client_control_state *state;
1384         enum monitor_result status;
1385         int j;
1386         
1387         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1388         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1389         rmdata->count  = 0;
1390         rmdata->pnn    = pnn;
1391         rmdata->status = MONITOR_OK;
1392
1393         /* loop over all active nodes and send an async getrecmaster call to 
1394            them*/
1395         for (j=0; j<nodemap->num; j++) {
1396                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1397                         continue;
1398                 }
1399                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1400                                         CONTROL_TIMEOUT(),
1401                                         nodemap->nodes[j].pnn);
1402                 if (state == NULL) {
1403                         /* we failed to send the control, treat this as 
1404                            an error and try again next iteration
1405                         */                      
1406                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1407                         talloc_free(mem_ctx);
1408                         return MONITOR_FAILED;
1409                 }
1410
1411                 /* set up the callback functions */
1412                 state->async.fn = verify_recmaster_callback;
1413                 state->async.private = rmdata;
1414
1415                 /* one more control to wait for to complete */
1416                 rmdata->count++;
1417         }
1418
1419
1420         /* now wait for up to the maximum number of seconds allowed
1421            or until all nodes we expect a response from has replied
1422         */
1423         while (rmdata->count > 0) {
1424                 event_loop_once(ctdb->ev);
1425         }
1426
1427         status = rmdata->status;
1428         talloc_free(mem_ctx);
1429         return status;
1430 }
1431
1432
1433 /*
1434   the main monitoring loop
1435  */
1436 static void monitor_cluster(struct ctdb_context *ctdb)
1437 {
1438         uint32_t pnn, num_active, recmaster;
1439         TALLOC_CTX *mem_ctx=NULL;
1440         struct ctdb_node_map *nodemap=NULL;
1441         struct ctdb_node_map *remote_nodemap=NULL;
1442         struct ctdb_vnn_map *vnnmap=NULL;
1443         struct ctdb_vnn_map *remote_vnnmap=NULL;
1444         int i, j, ret;
1445         struct ctdb_recoverd *rec;
1446         struct ctdb_all_public_ips *ips;
1447
1448         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1449         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1450
1451         rec->ctdb = ctdb;
1452         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1453         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1454
1455         rec->priority_time = timeval_current();
1456
1457         /* register a message port for recovery elections */
1458         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1459
1460         /* and one for when nodes are disabled/enabled */
1461         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1462
1463         /* and one for when nodes are banned */
1464         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1465
1466         /* and one for when nodes are unbanned */
1467         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1468         
1469 again:
1470         if (mem_ctx) {
1471                 talloc_free(mem_ctx);
1472                 mem_ctx = NULL;
1473         }
1474         mem_ctx = talloc_new(ctdb);
1475         if (!mem_ctx) {
1476                 DEBUG(0,("Failed to create temporary context\n"));
1477                 exit(-1);
1478         }
1479
1480         /* we only check for recovery once every second */
1481         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1482
1483         /* get relevant tunables */
1484         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1485         if (ret != 0) {
1486                 DEBUG(0,("Failed to get tunables - retrying\n"));
1487                 goto again;
1488         }
1489
1490         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1491         if (pnn == (uint32_t)-1) {
1492                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1493                 goto again;
1494         }
1495
1496         /* get the vnnmap */
1497         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1498         if (ret != 0) {
1499                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1500                 goto again;
1501         }
1502
1503
1504         /* get number of nodes */
1505         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1506         if (ret != 0) {
1507                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1508                 goto again;
1509         }
1510
1511
1512         /* count how many active nodes there are */
1513         num_active = 0;
1514         for (i=0; i<nodemap->num; i++) {
1515                 if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
1516                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1517                 } else {
1518                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1519                 }
1520                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1521                         num_active++;
1522                 }
1523         }
1524
1525
1526         /* check which node is the recovery master */
1527         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1528         if (ret != 0) {
1529                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1530                 goto again;
1531         }
1532
1533         if (recmaster == (uint32_t)-1) {
1534                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1535                 force_election(rec, mem_ctx, pnn, nodemap);
1536                 goto again;
1537         }
1538         
1539         /* verify that the recmaster node is still active */
1540         for (j=0; j<nodemap->num; j++) {
1541                 if (nodemap->nodes[j].pnn==recmaster) {
1542                         break;
1543                 }
1544         }
1545
1546         if (j == nodemap->num) {
1547                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1548                 force_election(rec, mem_ctx, pnn, nodemap);
1549                 goto again;
1550         }
1551
1552         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1553                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1554                 force_election(rec, mem_ctx, pnn, nodemap);
1555                 goto again;
1556         }
1557
1558         /* verify that the public ip address allocation is consistent */
1559         if (ctdb->vnn != NULL) {
1560                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
1561                 if (ret != 0) {
1562                         DEBUG(0, ("Unable to get public ips from node %u\n", i));
1563                         goto again;
1564                 }
1565                 for (j=0; j<ips->num; j++) {
1566                         /* verify that we have the ip addresses we should have
1567                            and we dont have ones we shouldnt have.
1568                            if we find an inconsistency we set recmode to
1569                            active on the local node and wait for the recmaster
1570                            to do a full blown recovery
1571                         */
1572                         if (ips->ips[j].pnn == pnn) {
1573                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
1574                                         DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1575                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1576                                         if (ret != 0) {
1577                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1578                                                 goto again;
1579                                         }
1580                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1581                                         if (ret != 0) {
1582                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1583                                                 goto again;
1584                                         }
1585                                 }
1586                         } else {
1587                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
1588                                         DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1589                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1590                                         if (ret != 0) {
1591                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1592                                                 goto again;
1593                                         }
1594                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1595                                         if (ret != 0) {
1596                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1597                                                 goto again;
1598                                         }
1599                                 }
1600                         }
1601                 }
1602         }
1603
1604         /* if we are not the recmaster then we do not need to check
1605            if recovery is needed
1606          */
1607         if (pnn != recmaster) {
1608                 goto again;
1609         }
1610
1611
1612         /* update the list of public ips that a node can handle for
1613            all connected nodes
1614         */
1615         for (j=0; j<nodemap->num; j++) {
1616                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1617                         continue;
1618                 }
1619                 /* release any existing data */
1620                 if (ctdb->nodes[j]->public_ips) {
1621                         talloc_free(ctdb->nodes[j]->public_ips);
1622                         ctdb->nodes[j]->public_ips = NULL;
1623                 }
1624                 /* grab a new shiny list of public ips from the node */
1625                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1626                         ctdb->nodes[j]->pnn, 
1627                         ctdb->nodes,
1628                         &ctdb->nodes[j]->public_ips)) {
1629                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1630                                 ctdb->nodes[j]->pnn));
1631                         goto again;
1632                 }
1633         }
1634
1635
1636         /* verify that all active nodes agree that we are the recmaster */
1637         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1638         case MONITOR_RECOVERY_NEEDED:
1639                 /* can not happen */
1640                 goto again;
1641         case MONITOR_ELECTION_NEEDED:
1642                 force_election(rec, mem_ctx, pnn, nodemap);
1643                 goto again;
1644         case MONITOR_OK:
1645                 break;
1646         case MONITOR_FAILED:
1647                 goto again;
1648         }
1649
1650
1651         if (rec->need_recovery) {
1652                 /* a previous recovery didn't finish */
1653                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1654                 goto again;             
1655         }
1656
1657         /* verify that all active nodes are in normal mode 
1658            and not in recovery mode 
1659          */
1660         switch (verify_recmode(ctdb, nodemap)) {
1661         case MONITOR_RECOVERY_NEEDED:
1662                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1663                 goto again;
1664         case MONITOR_FAILED:
1665                 goto again;
1666         case MONITOR_ELECTION_NEEDED:
1667                 /* can not happen */
1668         case MONITOR_OK:
1669                 break;
1670         }
1671
1672
1673
1674         /* get the nodemap for all active remote nodes and verify
1675            they are the same as for this node
1676          */
1677         for (j=0; j<nodemap->num; j++) {
1678                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1679                         continue;
1680                 }
1681                 if (nodemap->nodes[j].pnn == pnn) {
1682                         continue;
1683                 }
1684
1685                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1686                                            mem_ctx, &remote_nodemap);
1687                 if (ret != 0) {
1688                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1689                                   nodemap->nodes[j].pnn));
1690                         goto again;
1691                 }
1692
1693                 /* if the nodes disagree on how many nodes there are
1694                    then this is a good reason to try recovery
1695                  */
1696                 if (remote_nodemap->num != nodemap->num) {
1697                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1698                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1699                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1700                         goto again;
1701                 }
1702
1703                 /* if the nodes disagree on which nodes exist and are
1704                    active, then that is also a good reason to do recovery
1705                  */
1706                 for (i=0;i<nodemap->num;i++) {
1707                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
1708                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
1709                                           nodemap->nodes[j].pnn, i, 
1710                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
1711                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1712                                             vnnmap, nodemap->nodes[j].pnn);
1713                                 goto again;
1714                         }
1715                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1716                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1717                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1718                                           nodemap->nodes[j].pnn, i,
1719                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1720                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1721                                             vnnmap, nodemap->nodes[j].pnn);
1722                                 goto again;
1723                         }
1724                 }
1725
1726                 /* update our nodemap flags according to the other
1727                    server - this gets the NODE_FLAGS_DISABLED
1728                    flag. Note that the remote node is authoritative
1729                    for its flags (except CONNECTED, which we know
1730                    matches in this code) */
1731                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1732                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1733                         rec->need_takeover_run = true;
1734                 }
1735         }
1736
1737
1738         /* there better be the same number of lmasters in the vnn map
1739            as there are active nodes or we will have to do a recovery
1740          */
1741         if (vnnmap->size != num_active) {
1742                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1743                           vnnmap->size, num_active));
1744                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1745                 goto again;
1746         }
1747
1748         /* verify that all active nodes in the nodemap also exist in 
1749            the vnnmap.
1750          */
1751         for (j=0; j<nodemap->num; j++) {
1752                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1753                         continue;
1754                 }
1755                 if (nodemap->nodes[j].pnn == pnn) {
1756                         continue;
1757                 }
1758
1759                 for (i=0; i<vnnmap->size; i++) {
1760                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
1761                                 break;
1762                         }
1763                 }
1764                 if (i == vnnmap->size) {
1765                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1766                                   nodemap->nodes[j].pnn));
1767                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1768                         goto again;
1769                 }
1770         }
1771
1772         
1773         /* verify that all other nodes have the same vnnmap
1774            and are from the same generation
1775          */
1776         for (j=0; j<nodemap->num; j++) {
1777                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1778                         continue;
1779                 }
1780                 if (nodemap->nodes[j].pnn == pnn) {
1781                         continue;
1782                 }
1783
1784                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1785                                           mem_ctx, &remote_vnnmap);
1786                 if (ret != 0) {
1787                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1788                                   nodemap->nodes[j].pnn));
1789                         goto again;
1790                 }
1791
1792                 /* verify the vnnmap generation is the same */
1793                 if (vnnmap->generation != remote_vnnmap->generation) {
1794                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1795                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
1796                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1797                         goto again;
1798                 }
1799
1800                 /* verify the vnnmap size is the same */
1801                 if (vnnmap->size != remote_vnnmap->size) {
1802                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1803                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
1804                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1805                         goto again;
1806                 }
1807
1808                 /* verify the vnnmap is the same */
1809                 for (i=0;i<vnnmap->size;i++) {
1810                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1811                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1812                                           nodemap->nodes[j].pnn));
1813                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1814                                             vnnmap, nodemap->nodes[j].pnn);
1815                                 goto again;
1816                         }
1817                 }
1818         }
1819
1820         /* we might need to change who has what IP assigned */
1821         if (rec->need_takeover_run) {
1822                 rec->need_takeover_run = false;
1823                 ret = ctdb_takeover_run(ctdb, nodemap);
1824                 if (ret != 0) {
1825                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
1826                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1827                                     vnnmap, nodemap->nodes[j].pnn);
1828                 }
1829         }
1830
1831         goto again;
1832
1833 }
1834
1835 /*
1836   event handler for when the main ctdbd dies
1837  */
1838 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1839                                  uint16_t flags, void *private_data)
1840 {
1841         DEBUG(0,("recovery daemon parent died - exiting\n"));
1842         _exit(1);
1843 }
1844
1845 /*
1846   startup the recovery daemon as a child of the main ctdb daemon
1847  */
1848 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1849 {
1850         int ret;
1851         int fd[2];
1852         pid_t child;
1853
1854         if (pipe(fd) != 0) {
1855                 return -1;
1856         }
1857
1858         child = fork();
1859         if (child == -1) {
1860                 return -1;
1861         }
1862         
1863         if (child != 0) {
1864                 close(fd[0]);
1865                 return 0;
1866         }
1867
1868         close(fd[1]);
1869
1870         /* shutdown the transport */
1871         ctdb->methods->shutdown(ctdb);
1872
1873         /* get a new event context */
1874         talloc_free(ctdb->ev);
1875         ctdb->ev = event_context_init(ctdb);
1876
1877         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1878                      ctdb_recoverd_parent, &fd[0]);     
1879
1880         close(ctdb->daemon.sd);
1881         ctdb->daemon.sd = -1;
1882
1883         srandom(getpid() ^ time(NULL));
1884
1885         /* initialise ctdb */
1886         ret = ctdb_socket_connect(ctdb);
1887         if (ret != 0) {
1888                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1889                 exit(1);
1890         }
1891
1892         monitor_cluster(ctdb);
1893
1894         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1895         return -1;
1896 }