recoverd: Move disabling of IP checks into do_takeover_run()
[amitay/samba.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         bool takeover_run_in_progress;
71         TALLOC_CTX *ip_check_disable_ctx;
72         struct ctdb_control_get_ifaces *ifaces;
73         TALLOC_CTX *deferred_rebalance_ctx;
74 };
75
76 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
77 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
78
79 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
80
81 /*
82   ban a node for a period of time
83  */
84 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
85 {
86         int ret;
87         struct ctdb_context *ctdb = rec->ctdb;
88         struct ctdb_ban_time bantime;
89        
90         if (!ctdb_validate_pnn(ctdb, pnn)) {
91                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
92                 return;
93         }
94
95         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
96
97         bantime.pnn  = pnn;
98         bantime.time = ban_time;
99
100         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
101         if (ret != 0) {
102                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
103                 return;
104         }
105
106 }
107
108 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
109
110
111 /*
112   remember the trouble maker
113  */
114 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
115 {
116         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
117         struct ctdb_banning_state *ban_state;
118
119         if (culprit > ctdb->num_nodes) {
120                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
121                 return;
122         }
123
124         /* If we are banned or stopped, do not set other nodes as culprits */
125         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
126                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
127                 return;
128         }
129
130         if (ctdb->nodes[culprit]->ban_state == NULL) {
131                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
132                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
133
134                 
135         }
136         ban_state = ctdb->nodes[culprit]->ban_state;
137         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
138                 /* this was the first time in a long while this node
139                    misbehaved so we will forgive any old transgressions.
140                 */
141                 ban_state->count = 0;
142         }
143
144         ban_state->count += count;
145         ban_state->last_reported_time = timeval_current();
146         rec->last_culprit_node = culprit;
147 }
148
149 /*
150   remember the trouble maker
151  */
152 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
153 {
154         ctdb_set_culprit_count(rec, culprit, 1);
155 }
156
157
158 /* this callback is called for every node that failed to execute the
159    recovered event
160 */
161 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
162 {
163         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
164
165         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
166
167         ctdb_set_culprit(rec, node_pnn);
168 }
169
170 /*
171   run the "recovered" eventscript on all nodes
172  */
173 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
174 {
175         TALLOC_CTX *tmp_ctx;
176         uint32_t *nodes;
177         struct ctdb_context *ctdb = rec->ctdb;
178
179         tmp_ctx = talloc_new(ctdb);
180         CTDB_NO_MEMORY(ctdb, tmp_ctx);
181
182         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
183         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
184                                         nodes, 0,
185                                         CONTROL_TIMEOUT(), false, tdb_null,
186                                         NULL, recovered_fail_callback,
187                                         rec) != 0) {
188                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
189
190                 talloc_free(tmp_ctx);
191                 return -1;
192         }
193
194         talloc_free(tmp_ctx);
195         return 0;
196 }
197
198 /* this callback is called for every node that failed to execute the
199    start recovery event
200 */
201 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
202 {
203         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
204
205         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
206
207         ctdb_set_culprit(rec, node_pnn);
208 }
209
210 /*
211   run the "startrecovery" eventscript on all nodes
212  */
213 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
214 {
215         TALLOC_CTX *tmp_ctx;
216         uint32_t *nodes;
217         struct ctdb_context *ctdb = rec->ctdb;
218
219         tmp_ctx = talloc_new(ctdb);
220         CTDB_NO_MEMORY(ctdb, tmp_ctx);
221
222         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
223         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
224                                         nodes, 0,
225                                         CONTROL_TIMEOUT(), false, tdb_null,
226                                         NULL,
227                                         startrecovery_fail_callback,
228                                         rec) != 0) {
229                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
230                 talloc_free(tmp_ctx);
231                 return -1;
232         }
233
234         talloc_free(tmp_ctx);
235         return 0;
236 }
237
238 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
239 {
240         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
241                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
242                 return;
243         }
244         if (node_pnn < ctdb->num_nodes) {
245                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
246         }
247
248         if (node_pnn == ctdb->pnn) {
249                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
250         }
251 }
252
253 /*
254   update the node capabilities for all connected nodes
255  */
256 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
257 {
258         uint32_t *nodes;
259         TALLOC_CTX *tmp_ctx;
260
261         tmp_ctx = talloc_new(ctdb);
262         CTDB_NO_MEMORY(ctdb, tmp_ctx);
263
264         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
265         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
266                                         nodes, 0,
267                                         CONTROL_TIMEOUT(),
268                                         false, tdb_null,
269                                         async_getcap_callback, NULL,
270                                         NULL) != 0) {
271                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
272                 talloc_free(tmp_ctx);
273                 return -1;
274         }
275
276         talloc_free(tmp_ctx);
277         return 0;
278 }
279
280 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
281 {
282         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
283
284         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
285         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
286 }
287
288 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
289 {
290         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
291
292         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
293         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
294 }
295
296 /*
297   change recovery mode on all nodes
298  */
299 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
300 {
301         TDB_DATA data;
302         uint32_t *nodes;
303         TALLOC_CTX *tmp_ctx;
304
305         tmp_ctx = talloc_new(ctdb);
306         CTDB_NO_MEMORY(ctdb, tmp_ctx);
307
308         /* freeze all nodes */
309         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
310         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
311                 int i;
312
313                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
314                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
315                                                 nodes, i,
316                                                 CONTROL_TIMEOUT(),
317                                                 false, tdb_null,
318                                                 NULL,
319                                                 set_recmode_fail_callback,
320                                                 rec) != 0) {
321                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
322                                 talloc_free(tmp_ctx);
323                                 return -1;
324                         }
325                 }
326         }
327
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&rec_mode;
331
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(),
335                                         false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /*
348   change recovery master on all node
349  */
350 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
351 {
352         TDB_DATA data;
353         TALLOC_CTX *tmp_ctx;
354         uint32_t *nodes;
355
356         tmp_ctx = talloc_new(ctdb);
357         CTDB_NO_MEMORY(ctdb, tmp_ctx);
358
359         data.dsize = sizeof(uint32_t);
360         data.dptr = (unsigned char *)&pnn;
361
362         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
363         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
364                                         nodes, 0,
365                                         CONTROL_TIMEOUT(), false, data,
366                                         NULL, NULL,
367                                         NULL) != 0) {
368                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
369                 talloc_free(tmp_ctx);
370                 return -1;
371         }
372
373         talloc_free(tmp_ctx);
374         return 0;
375 }
376
377 /* update all remote nodes to use the same db priority that we have
378    this can fail if the remove node has not yet been upgraded to 
379    support this function, so we always return success and never fail
380    a recovery if this call fails.
381 */
382 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
383         struct ctdb_node_map *nodemap, 
384         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
385 {
386         int db;
387         uint32_t *nodes;
388
389         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
390
391         /* step through all local databases */
392         for (db=0; db<dbmap->num;db++) {
393                 TDB_DATA data;
394                 struct ctdb_db_priority db_prio;
395                 int ret;
396
397                 db_prio.db_id     = dbmap->dbs[db].dbid;
398                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
399                 if (ret != 0) {
400                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
401                         continue;
402                 }
403
404                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
405
406                 data.dptr  = (uint8_t *)&db_prio;
407                 data.dsize = sizeof(db_prio);
408
409                 if (ctdb_client_async_control(ctdb,
410                                         CTDB_CONTROL_SET_DB_PRIORITY,
411                                         nodes, 0,
412                                         CONTROL_TIMEOUT(), false, data,
413                                         NULL, NULL,
414                                         NULL) != 0) {
415                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
416                 }
417         }
418
419         return 0;
420 }                       
421
422 /*
423   ensure all other nodes have attached to any databases that we have
424  */
425 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
426                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
427 {
428         int i, j, db, ret;
429         struct ctdb_dbid_map *remote_dbmap;
430
431         /* verify that all other nodes have all our databases */
432         for (j=0; j<nodemap->num; j++) {
433                 /* we dont need to ourself ourselves */
434                 if (nodemap->nodes[j].pnn == pnn) {
435                         continue;
436                 }
437                 /* dont check nodes that are unavailable */
438                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
439                         continue;
440                 }
441
442                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
443                                          mem_ctx, &remote_dbmap);
444                 if (ret != 0) {
445                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
446                         return -1;
447                 }
448
449                 /* step through all local databases */
450                 for (db=0; db<dbmap->num;db++) {
451                         const char *name;
452
453
454                         for (i=0;i<remote_dbmap->num;i++) {
455                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
456                                         break;
457                                 }
458                         }
459                         /* the remote node already have this database */
460                         if (i!=remote_dbmap->num) {
461                                 continue;
462                         }
463                         /* ok so we need to create this database */
464                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
465                                             mem_ctx, &name);
466                         if (ret != 0) {
467                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
468                                 return -1;
469                         }
470                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
471                                            mem_ctx, name,
472                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
473                         if (ret != 0) {
474                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
475                                 return -1;
476                         }
477                 }
478         }
479
480         return 0;
481 }
482
483
484 /*
485   ensure we are attached to any databases that anyone else is attached to
486  */
487 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
488                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
489 {
490         int i, j, db, ret;
491         struct ctdb_dbid_map *remote_dbmap;
492
493         /* verify that we have all database any other node has */
494         for (j=0; j<nodemap->num; j++) {
495                 /* we dont need to ourself ourselves */
496                 if (nodemap->nodes[j].pnn == pnn) {
497                         continue;
498                 }
499                 /* dont check nodes that are unavailable */
500                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
501                         continue;
502                 }
503
504                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
505                                          mem_ctx, &remote_dbmap);
506                 if (ret != 0) {
507                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
508                         return -1;
509                 }
510
511                 /* step through all databases on the remote node */
512                 for (db=0; db<remote_dbmap->num;db++) {
513                         const char *name;
514
515                         for (i=0;i<(*dbmap)->num;i++) {
516                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
517                                         break;
518                                 }
519                         }
520                         /* we already have this db locally */
521                         if (i!=(*dbmap)->num) {
522                                 continue;
523                         }
524                         /* ok so we need to create this database and
525                            rebuild dbmap
526                          */
527                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
528                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
529                         if (ret != 0) {
530                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
531                                           nodemap->nodes[j].pnn));
532                                 return -1;
533                         }
534                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
535                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
536                         if (ret != 0) {
537                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
538                                 return -1;
539                         }
540                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
541                         if (ret != 0) {
542                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
543                                 return -1;
544                         }
545                 }
546         }
547
548         return 0;
549 }
550
551
552 /*
553   pull the remote database contents from one node into the recdb
554  */
555 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
556                                     struct tdb_wrap *recdb, uint32_t dbid)
557 {
558         int ret;
559         TDB_DATA outdata;
560         struct ctdb_marshall_buffer *reply;
561         struct ctdb_rec_data *rec;
562         int i;
563         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
564
565         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
566                                CONTROL_TIMEOUT(), &outdata);
567         if (ret != 0) {
568                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
569                 talloc_free(tmp_ctx);
570                 return -1;
571         }
572
573         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
574
575         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
576                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
577                 talloc_free(tmp_ctx);
578                 return -1;
579         }
580         
581         rec = (struct ctdb_rec_data *)&reply->data[0];
582         
583         for (i=0;
584              i<reply->count;
585              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
586                 TDB_DATA key, data;
587                 struct ctdb_ltdb_header *hdr;
588                 TDB_DATA existing;
589                 
590                 key.dptr = &rec->data[0];
591                 key.dsize = rec->keylen;
592                 data.dptr = &rec->data[key.dsize];
593                 data.dsize = rec->datalen;
594                 
595                 hdr = (struct ctdb_ltdb_header *)data.dptr;
596
597                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
598                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
599                         talloc_free(tmp_ctx);
600                         return -1;
601                 }
602
603                 /* fetch the existing record, if any */
604                 existing = tdb_fetch(recdb->tdb, key);
605                 
606                 if (existing.dptr != NULL) {
607                         struct ctdb_ltdb_header header;
608                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
609                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
610                                          (unsigned)existing.dsize, srcnode));
611                                 free(existing.dptr);
612                                 talloc_free(tmp_ctx);
613                                 return -1;
614                         }
615                         header = *(struct ctdb_ltdb_header *)existing.dptr;
616                         free(existing.dptr);
617                         if (!(header.rsn < hdr->rsn ||
618                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
619                                 continue;
620                         }
621                 }
622                 
623                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
624                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
625                         talloc_free(tmp_ctx);
626                         return -1;                              
627                 }
628         }
629
630         talloc_free(tmp_ctx);
631
632         return 0;
633 }
634
635
636 struct pull_seqnum_cbdata {
637         int failed;
638         uint32_t pnn;
639         uint64_t seqnum;
640 };
641
642 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
643 {
644         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
645         uint64_t seqnum;
646
647         if (cb_data->failed != 0) {
648                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
649                 return;
650         }
651
652         if (res != 0) {
653                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
654                 cb_data->failed = 1;
655                 return;
656         }
657
658         if (outdata.dsize != sizeof(uint64_t)) {
659                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
660                 cb_data->failed = -1;
661                 return;
662         }
663
664         seqnum = *((uint64_t *)outdata.dptr);
665
666         if (seqnum > cb_data->seqnum) {
667                 cb_data->seqnum = seqnum;
668                 cb_data->pnn = node_pnn;
669         }
670 }
671
672 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
673 {
674         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
675
676         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
677         cb_data->failed = 1;
678 }
679
680 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
681                                 struct ctdb_recoverd *rec, 
682                                 struct ctdb_node_map *nodemap, 
683                                 struct tdb_wrap *recdb, uint32_t dbid)
684 {
685         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
686         uint32_t *nodes;
687         TDB_DATA data;
688         uint32_t outdata[2];
689         struct pull_seqnum_cbdata *cb_data;
690
691         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
692
693         outdata[0] = dbid;
694         outdata[1] = 0;
695
696         data.dsize = sizeof(outdata);
697         data.dptr  = (uint8_t *)&outdata[0];
698
699         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
700         if (cb_data == NULL) {
701                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
702                 talloc_free(tmp_ctx);
703                 return -1;
704         }
705
706         cb_data->failed = 0;
707         cb_data->pnn    = -1;
708         cb_data->seqnum = 0;
709         
710         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
711         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
712                                         nodes, 0,
713                                         CONTROL_TIMEOUT(), false, data,
714                                         pull_seqnum_cb,
715                                         pull_seqnum_fail_cb,
716                                         cb_data) != 0) {
717                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
718
719                 talloc_free(tmp_ctx);
720                 return -1;
721         }
722
723         if (cb_data->failed != 0) {
724                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
725                 talloc_free(tmp_ctx);
726                 return -1;
727         }
728
729         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
730                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
731                 talloc_free(tmp_ctx);
732                 return -1;
733         }
734
735         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
736
737         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
738                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
739                 talloc_free(tmp_ctx);
740                 return -1;
741         }
742
743         talloc_free(tmp_ctx);
744         return 0;
745 }
746
747
748 /*
749   pull all the remote database contents into the recdb
750  */
751 static int pull_remote_database(struct ctdb_context *ctdb,
752                                 struct ctdb_recoverd *rec, 
753                                 struct ctdb_node_map *nodemap, 
754                                 struct tdb_wrap *recdb, uint32_t dbid,
755                                 bool persistent)
756 {
757         int j;
758
759         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
760                 int ret;
761                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
762                 if (ret == 0) {
763                         return 0;
764                 }
765         }
766
767         /* pull all records from all other nodes across onto this node
768            (this merges based on rsn)
769         */
770         for (j=0; j<nodemap->num; j++) {
771                 /* dont merge from nodes that are unavailable */
772                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
773                         continue;
774                 }
775                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
776                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
777                                  nodemap->nodes[j].pnn));
778                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
779                         return -1;
780                 }
781         }
782         
783         return 0;
784 }
785
786
787 /*
788   update flags on all active nodes
789  */
790 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
791 {
792         int ret;
793
794         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
795                 if (ret != 0) {
796                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
797                 return -1;
798         }
799
800         return 0;
801 }
802
803 /*
804   ensure all nodes have the same vnnmap we do
805  */
806 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
807                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
808 {
809         int j, ret;
810
811         /* push the new vnn map out to all the nodes */
812         for (j=0; j<nodemap->num; j++) {
813                 /* dont push to nodes that are unavailable */
814                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
815                         continue;
816                 }
817
818                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
819                 if (ret != 0) {
820                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
821                         return -1;
822                 }
823         }
824
825         return 0;
826 }
827
828
829 struct vacuum_info {
830         struct vacuum_info *next, *prev;
831         struct ctdb_recoverd *rec;
832         uint32_t srcnode;
833         struct ctdb_db_context *ctdb_db;
834         struct ctdb_marshall_buffer *recs;
835         struct ctdb_rec_data *r;
836 };
837
838 static void vacuum_fetch_next(struct vacuum_info *v);
839
840 /*
841   called when a vacuum fetch has completed - just free it and do the next one
842  */
843 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
844 {
845         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
846         talloc_free(state);
847         vacuum_fetch_next(v);
848 }
849
850
851 /*
852   process the next element from the vacuum list
853 */
854 static void vacuum_fetch_next(struct vacuum_info *v)
855 {
856         struct ctdb_call call;
857         struct ctdb_rec_data *r;
858
859         while (v->recs->count) {
860                 struct ctdb_client_call_state *state;
861                 TDB_DATA data;
862                 struct ctdb_ltdb_header *hdr;
863
864                 ZERO_STRUCT(call);
865                 call.call_id = CTDB_NULL_FUNC;
866                 call.flags = CTDB_IMMEDIATE_MIGRATION;
867                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
868
869                 r = v->r;
870                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
871                 v->recs->count--;
872
873                 call.key.dptr = &r->data[0];
874                 call.key.dsize = r->keylen;
875
876                 /* ensure we don't block this daemon - just skip a record if we can't get
877                    the chainlock */
878                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
879                         continue;
880                 }
881
882                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
883                 if (data.dptr == NULL) {
884                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
885                         continue;
886                 }
887
888                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
889                         free(data.dptr);
890                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
891                         continue;
892                 }
893                 
894                 hdr = (struct ctdb_ltdb_header *)data.dptr;
895                 if (hdr->dmaster == v->rec->ctdb->pnn) {
896                         /* its already local */
897                         free(data.dptr);
898                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
899                         continue;
900                 }
901
902                 free(data.dptr);
903
904                 state = ctdb_call_send(v->ctdb_db, &call);
905                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
906                 if (state == NULL) {
907                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
908                         talloc_free(v);
909                         return;
910                 }
911                 state->async.fn = vacuum_fetch_callback;
912                 state->async.private_data = v;
913                 return;
914         }
915
916         talloc_free(v);
917 }
918
919
920 /*
921   destroy a vacuum info structure
922  */
923 static int vacuum_info_destructor(struct vacuum_info *v)
924 {
925         DLIST_REMOVE(v->rec->vacuum_info, v);
926         return 0;
927 }
928
929
930 /*
931   handler for vacuum fetch
932 */
933 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
934                                  TDB_DATA data, void *private_data)
935 {
936         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
937         struct ctdb_marshall_buffer *recs;
938         int ret, i;
939         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
940         const char *name;
941         struct ctdb_dbid_map *dbmap=NULL;
942         bool persistent = false;
943         struct ctdb_db_context *ctdb_db;
944         struct ctdb_rec_data *r;
945         uint32_t srcnode;
946         struct vacuum_info *v;
947
948         recs = (struct ctdb_marshall_buffer *)data.dptr;
949         r = (struct ctdb_rec_data *)&recs->data[0];
950
951         if (recs->count == 0) {
952                 talloc_free(tmp_ctx);
953                 return;
954         }
955
956         srcnode = r->reqid;
957
958         for (v=rec->vacuum_info;v;v=v->next) {
959                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
960                         /* we're already working on records from this node */
961                         talloc_free(tmp_ctx);
962                         return;
963                 }
964         }
965
966         /* work out if the database is persistent */
967         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
968         if (ret != 0) {
969                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
970                 talloc_free(tmp_ctx);
971                 return;
972         }
973
974         for (i=0;i<dbmap->num;i++) {
975                 if (dbmap->dbs[i].dbid == recs->db_id) {
976                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
977                         break;
978                 }
979         }
980         if (i == dbmap->num) {
981                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
982                 talloc_free(tmp_ctx);
983                 return;         
984         }
985
986         /* find the name of this database */
987         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
988                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
989                 talloc_free(tmp_ctx);
990                 return;
991         }
992
993         /* attach to it */
994         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
995         if (ctdb_db == NULL) {
996                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
997                 talloc_free(tmp_ctx);
998                 return;
999         }
1000
1001         v = talloc_zero(rec, struct vacuum_info);
1002         if (v == NULL) {
1003                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1004                 talloc_free(tmp_ctx);
1005                 return;
1006         }
1007
1008         v->rec = rec;
1009         v->srcnode = srcnode;
1010         v->ctdb_db = ctdb_db;
1011         v->recs = talloc_memdup(v, recs, data.dsize);
1012         if (v->recs == NULL) {
1013                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1014                 talloc_free(v);
1015                 talloc_free(tmp_ctx);
1016                 return;         
1017         }
1018         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1019
1020         DLIST_ADD(rec->vacuum_info, v);
1021
1022         talloc_set_destructor(v, vacuum_info_destructor);
1023
1024         vacuum_fetch_next(v);
1025         talloc_free(tmp_ctx);
1026 }
1027
1028
1029 /*
1030   called when ctdb_wait_timeout should finish
1031  */
1032 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1033                               struct timeval yt, void *p)
1034 {
1035         uint32_t *timed_out = (uint32_t *)p;
1036         (*timed_out) = 1;
1037 }
1038
1039 /*
1040   wait for a given number of seconds
1041  */
1042 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1043 {
1044         uint32_t timed_out = 0;
1045         time_t usecs = (secs - (time_t)secs) * 1000000;
1046         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1047         while (!timed_out) {
1048                 event_loop_once(ctdb->ev);
1049         }
1050 }
1051
1052 /*
1053   called when an election times out (ends)
1054  */
1055 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1056                                   struct timeval t, void *p)
1057 {
1058         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1059         rec->election_timeout = NULL;
1060         fast_start = false;
1061
1062         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1063 }
1064
1065
1066 /*
1067   wait for an election to finish. It finished election_timeout seconds after
1068   the last election packet is received
1069  */
1070 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1071 {
1072         struct ctdb_context *ctdb = rec->ctdb;
1073         while (rec->election_timeout) {
1074                 event_loop_once(ctdb->ev);
1075         }
1076 }
1077
1078 /*
1079   Update our local flags from all remote connected nodes. 
1080   This is only run when we are or we belive we are the recovery master
1081  */
1082 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1083 {
1084         int j;
1085         struct ctdb_context *ctdb = rec->ctdb;
1086         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1087
1088         /* get the nodemap for all active remote nodes and verify
1089            they are the same as for this node
1090          */
1091         for (j=0; j<nodemap->num; j++) {
1092                 struct ctdb_node_map *remote_nodemap=NULL;
1093                 int ret;
1094
1095                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1096                         continue;
1097                 }
1098                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1099                         continue;
1100                 }
1101
1102                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1103                                            mem_ctx, &remote_nodemap);
1104                 if (ret != 0) {
1105                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1106                                   nodemap->nodes[j].pnn));
1107                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1108                         talloc_free(mem_ctx);
1109                         return MONITOR_FAILED;
1110                 }
1111                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1112                         /* We should tell our daemon about this so it
1113                            updates its flags or else we will log the same 
1114                            message again in the next iteration of recovery.
1115                            Since we are the recovery master we can just as
1116                            well update the flags on all nodes.
1117                         */
1118                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1119                         if (ret != 0) {
1120                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1121                                 return -1;
1122                         }
1123
1124                         /* Update our local copy of the flags in the recovery
1125                            daemon.
1126                         */
1127                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1128                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1129                                  nodemap->nodes[j].flags));
1130                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1131                 }
1132                 talloc_free(remote_nodemap);
1133         }
1134         talloc_free(mem_ctx);
1135         return MONITOR_OK;
1136 }
1137
1138
1139 /* Create a new random generation ip. 
1140    The generation id can not be the INVALID_GENERATION id
1141 */
1142 static uint32_t new_generation(void)
1143 {
1144         uint32_t generation;
1145
1146         while (1) {
1147                 generation = random();
1148
1149                 if (generation != INVALID_GENERATION) {
1150                         break;
1151                 }
1152         }
1153
1154         return generation;
1155 }
1156
1157
1158 /*
1159   create a temporary working database
1160  */
1161 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1162 {
1163         char *name;
1164         struct tdb_wrap *recdb;
1165         unsigned tdb_flags;
1166
1167         /* open up the temporary recovery database */
1168         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1169                                ctdb->db_directory_state,
1170                                ctdb->pnn);
1171         if (name == NULL) {
1172                 return NULL;
1173         }
1174         unlink(name);
1175
1176         tdb_flags = TDB_NOLOCK;
1177         if (ctdb->valgrinding) {
1178                 tdb_flags |= TDB_NOMMAP;
1179         }
1180         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1181
1182         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1183                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1184         if (recdb == NULL) {
1185                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1186         }
1187
1188         talloc_free(name);
1189
1190         return recdb;
1191 }
1192
1193
1194 /* 
1195    a traverse function for pulling all relevant records from recdb
1196  */
1197 struct recdb_data {
1198         struct ctdb_context *ctdb;
1199         struct ctdb_marshall_buffer *recdata;
1200         uint32_t len;
1201         uint32_t allocated_len;
1202         bool failed;
1203         bool persistent;
1204 };
1205
1206 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1207 {
1208         struct recdb_data *params = (struct recdb_data *)p;
1209         struct ctdb_rec_data *rec;
1210         struct ctdb_ltdb_header *hdr;
1211
1212         /*
1213          * skip empty records - but NOT for persistent databases:
1214          *
1215          * The record-by-record mode of recovery deletes empty records.
1216          * For persistent databases, this can lead to data corruption
1217          * by deleting records that should be there:
1218          *
1219          * - Assume the cluster has been running for a while.
1220          *
1221          * - A record R in a persistent database has been created and
1222          *   deleted a couple of times, the last operation being deletion,
1223          *   leaving an empty record with a high RSN, say 10.
1224          *
1225          * - Now a node N is turned off.
1226          *
1227          * - This leaves the local database copy of D on N with the empty
1228          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1229          *   the copy of record R.
1230          *
1231          * - Now the record is created again while node N is turned off.
1232          *   This creates R with RSN = 1 on all nodes except for N.
1233          *
1234          * - Now node N is turned on again. The following recovery will chose
1235          *   the older empty copy of R due to RSN 10 > RSN 1.
1236          *
1237          * ==> Hence the record is gone after the recovery.
1238          *
1239          * On databases like Samba's registry, this can damage the higher-level
1240          * data structures built from the various tdb-level records.
1241          */
1242         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1243                 return 0;
1244         }
1245
1246         /* update the dmaster field to point to us */
1247         hdr = (struct ctdb_ltdb_header *)data.dptr;
1248         if (!params->persistent) {
1249                 hdr->dmaster = params->ctdb->pnn;
1250                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1251         }
1252
1253         /* add the record to the blob ready to send to the nodes */
1254         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1255         if (rec == NULL) {
1256                 params->failed = true;
1257                 return -1;
1258         }
1259         if (params->len + rec->length >= params->allocated_len) {
1260                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1261                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1262         }
1263         if (params->recdata == NULL) {
1264                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1265                          rec->length + params->len));
1266                 params->failed = true;
1267                 return -1;
1268         }
1269         params->recdata->count++;
1270         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1271         params->len += rec->length;
1272         talloc_free(rec);
1273
1274         return 0;
1275 }
1276
1277 /*
1278   push the recdb database out to all nodes
1279  */
1280 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1281                                bool persistent,
1282                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1283 {
1284         struct recdb_data params;
1285         struct ctdb_marshall_buffer *recdata;
1286         TDB_DATA outdata;
1287         TALLOC_CTX *tmp_ctx;
1288         uint32_t *nodes;
1289
1290         tmp_ctx = talloc_new(ctdb);
1291         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1292
1293         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1294         CTDB_NO_MEMORY(ctdb, recdata);
1295
1296         recdata->db_id = dbid;
1297
1298         params.ctdb = ctdb;
1299         params.recdata = recdata;
1300         params.len = offsetof(struct ctdb_marshall_buffer, data);
1301         params.allocated_len = params.len;
1302         params.failed = false;
1303         params.persistent = persistent;
1304
1305         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1306                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1307                 talloc_free(params.recdata);
1308                 talloc_free(tmp_ctx);
1309                 return -1;
1310         }
1311
1312         if (params.failed) {
1313                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1314                 talloc_free(params.recdata);
1315                 talloc_free(tmp_ctx);
1316                 return -1;              
1317         }
1318
1319         recdata = params.recdata;
1320
1321         outdata.dptr = (void *)recdata;
1322         outdata.dsize = params.len;
1323
1324         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1325         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1326                                         nodes, 0,
1327                                         CONTROL_TIMEOUT(), false, outdata,
1328                                         NULL, NULL,
1329                                         NULL) != 0) {
1330                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1331                 talloc_free(recdata);
1332                 talloc_free(tmp_ctx);
1333                 return -1;
1334         }
1335
1336         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1337                   dbid, recdata->count));
1338
1339         talloc_free(recdata);
1340         talloc_free(tmp_ctx);
1341
1342         return 0;
1343 }
1344
1345
1346 /*
1347   go through a full recovery on one database 
1348  */
1349 static int recover_database(struct ctdb_recoverd *rec, 
1350                             TALLOC_CTX *mem_ctx,
1351                             uint32_t dbid,
1352                             bool persistent,
1353                             uint32_t pnn, 
1354                             struct ctdb_node_map *nodemap,
1355                             uint32_t transaction_id)
1356 {
1357         struct tdb_wrap *recdb;
1358         int ret;
1359         struct ctdb_context *ctdb = rec->ctdb;
1360         TDB_DATA data;
1361         struct ctdb_control_wipe_database w;
1362         uint32_t *nodes;
1363
1364         recdb = create_recdb(ctdb, mem_ctx);
1365         if (recdb == NULL) {
1366                 return -1;
1367         }
1368
1369         /* pull all remote databases onto the recdb */
1370         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1371         if (ret != 0) {
1372                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1373                 return -1;
1374         }
1375
1376         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1377
1378         /* wipe all the remote databases. This is safe as we are in a transaction */
1379         w.db_id = dbid;
1380         w.transaction_id = transaction_id;
1381
1382         data.dptr = (void *)&w;
1383         data.dsize = sizeof(w);
1384
1385         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1386         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1387                                         nodes, 0,
1388                                         CONTROL_TIMEOUT(), false, data,
1389                                         NULL, NULL,
1390                                         NULL) != 0) {
1391                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1392                 talloc_free(recdb);
1393                 return -1;
1394         }
1395         
1396         /* push out the correct database. This sets the dmaster and skips 
1397            the empty records */
1398         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1399         if (ret != 0) {
1400                 talloc_free(recdb);
1401                 return -1;
1402         }
1403
1404         /* all done with this database */
1405         talloc_free(recdb);
1406
1407         return 0;
1408 }
1409
1410 /*
1411   reload the nodes file 
1412 */
1413 static void reload_nodes_file(struct ctdb_context *ctdb)
1414 {
1415         ctdb->nodes = NULL;
1416         ctdb_load_nodes_file(ctdb);
1417 }
1418
1419 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1420                                          struct ctdb_recoverd *rec,
1421                                          struct ctdb_node_map *nodemap,
1422                                          uint32_t *culprit)
1423 {
1424         int j;
1425         int ret;
1426
1427         if (ctdb->num_nodes != nodemap->num) {
1428                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1429                                   ctdb->num_nodes, nodemap->num));
1430                 if (culprit) {
1431                         *culprit = ctdb->pnn;
1432                 }
1433                 return -1;
1434         }
1435
1436         for (j=0; j<nodemap->num; j++) {
1437                 /* For readability */
1438                 struct ctdb_node *node = ctdb->nodes[j];
1439
1440                 /* release any existing data */
1441                 if (node->known_public_ips) {
1442                         talloc_free(node->known_public_ips);
1443                         node->known_public_ips = NULL;
1444                 }
1445                 if (node->available_public_ips) {
1446                         talloc_free(node->available_public_ips);
1447                         node->available_public_ips = NULL;
1448                 }
1449
1450                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1451                         continue;
1452                 }
1453
1454                 /* Retrieve the list of known public IPs from the node */
1455                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1456                                         CONTROL_TIMEOUT(),
1457                                         node->pnn,
1458                                         ctdb->nodes,
1459                                         0,
1460                                         &node->known_public_ips);
1461                 if (ret != 0) {
1462                         DEBUG(DEBUG_ERR,
1463                               ("Failed to read known public IPs from node: %u\n",
1464                                node->pnn));
1465                         if (culprit) {
1466                                 *culprit = node->pnn;
1467                         }
1468                         return -1;
1469                 }
1470
1471                 if (ctdb->do_checkpublicip &&
1472                     (rec->ip_check_disable_ctx == NULL) &&
1473                     verify_remote_ip_allocation(ctdb,
1474                                                  node->known_public_ips,
1475                                                  node->pnn)) {
1476                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1477                         rec->need_takeover_run = true;
1478                 }
1479
1480                 /* Retrieve the list of available public IPs from the node */
1481                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1482                                         CONTROL_TIMEOUT(),
1483                                         node->pnn,
1484                                         ctdb->nodes,
1485                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1486                                         &node->available_public_ips);
1487                 if (ret != 0) {
1488                         DEBUG(DEBUG_ERR,
1489                               ("Failed to read available public IPs from node: %u\n",
1490                                node->pnn));
1491                         if (culprit) {
1492                                 *culprit = node->pnn;
1493                         }
1494                         return -1;
1495                 }
1496         }
1497
1498         return 0;
1499 }
1500
1501 /* when we start a recovery, make sure all nodes use the same reclock file
1502    setting
1503 */
1504 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1505 {
1506         struct ctdb_context *ctdb = rec->ctdb;
1507         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1508         TDB_DATA data;
1509         uint32_t *nodes;
1510
1511         if (ctdb->recovery_lock_file == NULL) {
1512                 data.dptr  = NULL;
1513                 data.dsize = 0;
1514         } else {
1515                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1516                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1517         }
1518
1519         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1520         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1521                                         nodes, 0,
1522                                         CONTROL_TIMEOUT(),
1523                                         false, data,
1524                                         NULL, NULL,
1525                                         rec) != 0) {
1526                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1527                 talloc_free(tmp_ctx);
1528                 return -1;
1529         }
1530
1531         talloc_free(tmp_ctx);
1532         return 0;
1533 }
1534
1535
1536 /*
1537  * this callback is called for every node that failed to execute ctdb_takeover_run()
1538  * and set flag to re-run takeover run.
1539  */
1540 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1541 {
1542         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1543
1544         if (callback_data != NULL) {
1545                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1546
1547                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1548
1549                 ctdb_set_culprit(rec, node_pnn);
1550         }
1551 }
1552
1553
1554 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1555 {
1556         struct ctdb_context *ctdb = rec->ctdb;
1557         int i;
1558         struct ctdb_banning_state *ban_state;
1559
1560         *self_ban = false;
1561         for (i=0; i<ctdb->num_nodes; i++) {
1562                 if (ctdb->nodes[i]->ban_state == NULL) {
1563                         continue;
1564                 }
1565                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1566                 if (ban_state->count < 2*ctdb->num_nodes) {
1567                         continue;
1568                 }
1569
1570                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1571                         ctdb->nodes[i]->pnn, ban_state->count,
1572                         ctdb->tunable.recovery_ban_period));
1573                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1574                 ban_state->count = 0;
1575
1576                 /* Banning ourself? */
1577                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1578                         *self_ban = true;
1579                 }
1580         }
1581 }
1582
1583 static bool do_takeover_run(struct ctdb_recoverd *rec,
1584                             struct ctdb_node_map *nodemap,
1585                             bool banning_credits_on_fail)
1586 {
1587         uint32_t disable_timeout;
1588         TDB_DATA data;
1589         int ret;
1590         bool ok;
1591
1592         if (rec->takeover_run_in_progress) {
1593                 DEBUG(DEBUG_ERR, (__location__
1594                                   " takeover run already in progress \n"));
1595                 ok = false;
1596                 goto done;
1597         }
1598
1599         /* Disable IP checks while doing this takeover run.  This will
1600          * stop those other nodes from triggering takeover runs when
1601          * think they should be hosting an IP but it isn't yet on an
1602          * interface.
1603          */
1604         data.dptr  = (uint8_t*)&disable_timeout;
1605         data.dsize = sizeof(disable_timeout);
1606
1607         disable_timeout = rec->ctdb->tunable.takeover_timeout;
1608         if (ctdb_client_send_message(rec->ctdb, CTDB_BROADCAST_CONNECTED,
1609                                      CTDB_SRVID_DISABLE_IP_CHECK,
1610                                      data) != 0) {
1611                 DEBUG(DEBUG_INFO,("Failed to disable IP check\n"));
1612         }
1613
1614         rec->takeover_run_in_progress = true;
1615
1616         ret = ctdb_takeover_run(rec->ctdb, nodemap, takeover_fail_callback,
1617                                 banning_credits_on_fail ? rec : NULL);
1618
1619         /* Reenable IP checks */
1620         disable_timeout = 0;
1621         if (ctdb_client_send_message(rec->ctdb, CTDB_BROADCAST_CONNECTED,
1622                                      CTDB_SRVID_DISABLE_IP_CHECK,
1623                                      data) != 0) {
1624                 DEBUG(DEBUG_INFO,("Failed to reenable IP check\n"));
1625         }
1626
1627         if (ret != 0) {
1628                 DEBUG(DEBUG_ERR, ("IP reallocation failed\n"));
1629                 ok = false;
1630                 goto done;
1631         }
1632
1633         ok = true;
1634 done:
1635         rec->need_takeover_run = !ok;
1636         rec->takeover_run_in_progress = false;
1637         return ok;
1638 }
1639
1640
1641 /*
1642   we are the recmaster, and recovery is needed - start a recovery run
1643  */
1644 static int do_recovery(struct ctdb_recoverd *rec, 
1645                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1646                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1647 {
1648         struct ctdb_context *ctdb = rec->ctdb;
1649         int i, j, ret;
1650         uint32_t generation;
1651         struct ctdb_dbid_map *dbmap;
1652         TDB_DATA data;
1653         uint32_t *nodes;
1654         struct timeval start_time;
1655         uint32_t culprit = (uint32_t)-1;
1656         bool self_ban;
1657
1658         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1659
1660         /* if recovery fails, force it again */
1661         rec->need_recovery = true;
1662
1663         ban_misbehaving_nodes(rec, &self_ban);
1664         if (self_ban) {
1665                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1666                 return -1;
1667         }
1668
1669         if (ctdb->tunable.verify_recovery_lock != 0) {
1670                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1671                 start_time = timeval_current();
1672                 if (!ctdb_recovery_lock(ctdb, true)) {
1673                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1674                                          "and ban ourself for %u seconds\n",
1675                                          ctdb->tunable.recovery_ban_period));
1676                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1677                         return -1;
1678                 }
1679                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1680                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1681         }
1682
1683         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1684
1685         /* get a list of all databases */
1686         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1687         if (ret != 0) {
1688                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1689                 return -1;
1690         }
1691
1692         /* we do the db creation before we set the recovery mode, so the freeze happens
1693            on all databases we will be dealing with. */
1694
1695         /* verify that we have all the databases any other node has */
1696         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1697         if (ret != 0) {
1698                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1699                 return -1;
1700         }
1701
1702         /* verify that all other nodes have all our databases */
1703         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1704         if (ret != 0) {
1705                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1706                 return -1;
1707         }
1708         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1709
1710         /* update the database priority for all remote databases */
1711         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1712         if (ret != 0) {
1713                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1714         }
1715         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1716
1717
1718         /* update all other nodes to use the same setting for reclock files
1719            as the local recovery master.
1720         */
1721         sync_recovery_lock_file_across_cluster(rec);
1722
1723         /* set recovery mode to active on all nodes */
1724         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1725         if (ret != 0) {
1726                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1727                 return -1;
1728         }
1729
1730         /* execute the "startrecovery" event script on all nodes */
1731         ret = run_startrecovery_eventscript(rec, nodemap);
1732         if (ret!=0) {
1733                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1734                 return -1;
1735         }
1736
1737         /*
1738           update all nodes to have the same flags that we have
1739          */
1740         for (i=0;i<nodemap->num;i++) {
1741                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1742                         continue;
1743                 }
1744
1745                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1746                 if (ret != 0) {
1747                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1748                         return -1;
1749                 }
1750         }
1751
1752         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1753
1754         /* pick a new generation number */
1755         generation = new_generation();
1756
1757         /* change the vnnmap on this node to use the new generation 
1758            number but not on any other nodes.
1759            this guarantees that if we abort the recovery prematurely
1760            for some reason (a node stops responding?)
1761            that we can just return immediately and we will reenter
1762            recovery shortly again.
1763            I.e. we deliberately leave the cluster with an inconsistent
1764            generation id to allow us to abort recovery at any stage and
1765            just restart it from scratch.
1766          */
1767         vnnmap->generation = generation;
1768         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1769         if (ret != 0) {
1770                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1771                 return -1;
1772         }
1773
1774         data.dptr = (void *)&generation;
1775         data.dsize = sizeof(uint32_t);
1776
1777         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1778         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1779                                         nodes, 0,
1780                                         CONTROL_TIMEOUT(), false, data,
1781                                         NULL,
1782                                         transaction_start_fail_callback,
1783                                         rec) != 0) {
1784                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1785                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1786                                         nodes, 0,
1787                                         CONTROL_TIMEOUT(), false, tdb_null,
1788                                         NULL,
1789                                         NULL,
1790                                         NULL) != 0) {
1791                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1792                 }
1793                 return -1;
1794         }
1795
1796         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1797
1798         for (i=0;i<dbmap->num;i++) {
1799                 ret = recover_database(rec, mem_ctx,
1800                                        dbmap->dbs[i].dbid,
1801                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1802                                        pnn, nodemap, generation);
1803                 if (ret != 0) {
1804                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1805                         return -1;
1806                 }
1807         }
1808
1809         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1810
1811         /* commit all the changes */
1812         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1813                                         nodes, 0,
1814                                         CONTROL_TIMEOUT(), false, data,
1815                                         NULL, NULL,
1816                                         NULL) != 0) {
1817                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1818                 return -1;
1819         }
1820
1821         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1822         
1823
1824         /* update the capabilities for all nodes */
1825         ret = update_capabilities(ctdb, nodemap);
1826         if (ret!=0) {
1827                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1828                 return -1;
1829         }
1830
1831         /* build a new vnn map with all the currently active and
1832            unbanned nodes */
1833         generation = new_generation();
1834         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1835         CTDB_NO_MEMORY(ctdb, vnnmap);
1836         vnnmap->generation = generation;
1837         vnnmap->size = 0;
1838         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1839         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1840         for (i=j=0;i<nodemap->num;i++) {
1841                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1842                         continue;
1843                 }
1844                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1845                         /* this node can not be an lmaster */
1846                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1847                         continue;
1848                 }
1849
1850                 vnnmap->size++;
1851                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1852                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1853                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1854
1855         }
1856         if (vnnmap->size == 0) {
1857                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1858                 vnnmap->size++;
1859                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1860                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1861                 vnnmap->map[0] = pnn;
1862         }       
1863
1864         /* update to the new vnnmap on all nodes */
1865         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1866         if (ret != 0) {
1867                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1868                 return -1;
1869         }
1870
1871         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1872
1873         /* update recmaster to point to us for all nodes */
1874         ret = set_recovery_master(ctdb, nodemap, pnn);
1875         if (ret!=0) {
1876                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1877                 return -1;
1878         }
1879
1880         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1881
1882         /*
1883           update all nodes to have the same flags that we have
1884          */
1885         for (i=0;i<nodemap->num;i++) {
1886                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1887                         continue;
1888                 }
1889
1890                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1891                 if (ret != 0) {
1892                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1893                         return -1;
1894                 }
1895         }
1896
1897         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1898
1899         /* disable recovery mode */
1900         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1901         if (ret != 0) {
1902                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1903                 return -1;
1904         }
1905
1906         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1907
1908         /* Fetch known/available public IPs from each active node */
1909         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1910         if (ret != 0) {
1911                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1912                                  culprit));
1913                 rec->need_takeover_run = true;
1914                 return -1;
1915         }
1916
1917         do_takeover_run(rec, nodemap, false);
1918
1919         /* execute the "recovered" event script on all nodes */
1920         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1921         if (ret!=0) {
1922                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1923                 return -1;
1924         }
1925
1926         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1927
1928         /* send a message to all clients telling them that the cluster 
1929            has been reconfigured */
1930         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1931
1932         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1933
1934         rec->need_recovery = false;
1935
1936         /* we managed to complete a full recovery, make sure to forgive
1937            any past sins by the nodes that could now participate in the
1938            recovery.
1939         */
1940         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1941         for (i=0;i<nodemap->num;i++) {
1942                 struct ctdb_banning_state *ban_state;
1943
1944                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1945                         continue;
1946                 }
1947
1948                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1949                 if (ban_state == NULL) {
1950                         continue;
1951                 }
1952
1953                 ban_state->count = 0;
1954         }
1955
1956
1957         /* We just finished a recovery successfully. 
1958            We now wait for rerecovery_timeout before we allow 
1959            another recovery to take place.
1960         */
1961         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1962         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1963         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1964
1965         return 0;
1966 }
1967
1968
1969 /*
1970   elections are won by first checking the number of connected nodes, then
1971   the priority time, then the pnn
1972  */
1973 struct election_message {
1974         uint32_t num_connected;
1975         struct timeval priority_time;
1976         uint32_t pnn;
1977         uint32_t node_flags;
1978 };
1979
1980 /*
1981   form this nodes election data
1982  */
1983 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1984 {
1985         int ret, i;
1986         struct ctdb_node_map *nodemap;
1987         struct ctdb_context *ctdb = rec->ctdb;
1988
1989         ZERO_STRUCTP(em);
1990
1991         em->pnn = rec->ctdb->pnn;
1992         em->priority_time = rec->priority_time;
1993
1994         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1995         if (ret != 0) {
1996                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1997                 return;
1998         }
1999
2000         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2001         em->node_flags = rec->node_flags;
2002
2003         for (i=0;i<nodemap->num;i++) {
2004                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2005                         em->num_connected++;
2006                 }
2007         }
2008
2009         /* we shouldnt try to win this election if we cant be a recmaster */
2010         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2011                 em->num_connected = 0;
2012                 em->priority_time = timeval_current();
2013         }
2014
2015         talloc_free(nodemap);
2016 }
2017
2018 /*
2019   see if the given election data wins
2020  */
2021 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2022 {
2023         struct election_message myem;
2024         int cmp = 0;
2025
2026         ctdb_election_data(rec, &myem);
2027
2028         /* we cant win if we dont have the recmaster capability */
2029         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2030                 return false;
2031         }
2032
2033         /* we cant win if we are banned */
2034         if (rec->node_flags & NODE_FLAGS_BANNED) {
2035                 return false;
2036         }
2037
2038         /* we cant win if we are stopped */
2039         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2040                 return false;
2041         }
2042
2043         /* we will automatically win if the other node is banned */
2044         if (em->node_flags & NODE_FLAGS_BANNED) {
2045                 return true;
2046         }
2047
2048         /* we will automatically win if the other node is banned */
2049         if (em->node_flags & NODE_FLAGS_STOPPED) {
2050                 return true;
2051         }
2052
2053         /* try to use the most connected node */
2054         if (cmp == 0) {
2055                 cmp = (int)myem.num_connected - (int)em->num_connected;
2056         }
2057
2058         /* then the longest running node */
2059         if (cmp == 0) {
2060                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2061         }
2062
2063         if (cmp == 0) {
2064                 cmp = (int)myem.pnn - (int)em->pnn;
2065         }
2066
2067         return cmp > 0;
2068 }
2069
2070 /*
2071   send out an election request
2072  */
2073 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2074 {
2075         int ret;
2076         TDB_DATA election_data;
2077         struct election_message emsg;
2078         uint64_t srvid;
2079         struct ctdb_context *ctdb = rec->ctdb;
2080
2081         srvid = CTDB_SRVID_RECOVERY;
2082
2083         ctdb_election_data(rec, &emsg);
2084
2085         election_data.dsize = sizeof(struct election_message);
2086         election_data.dptr  = (unsigned char *)&emsg;
2087
2088
2089         /* send an election message to all active nodes */
2090         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2091         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2092
2093
2094         /* A new node that is already frozen has entered the cluster.
2095            The existing nodes are not frozen and dont need to be frozen
2096            until the election has ended and we start the actual recovery
2097         */
2098         if (update_recmaster == true) {
2099                 /* first we assume we will win the election and set 
2100                    recoverymaster to be ourself on the current node
2101                  */
2102                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2103                 if (ret != 0) {
2104                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2105                         return -1;
2106                 }
2107         }
2108
2109
2110         return 0;
2111 }
2112
2113 /*
2114   this function will unban all nodes in the cluster
2115 */
2116 static void unban_all_nodes(struct ctdb_context *ctdb)
2117 {
2118         int ret, i;
2119         struct ctdb_node_map *nodemap;
2120         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2121         
2122         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2123         if (ret != 0) {
2124                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2125                 return;
2126         }
2127
2128         for (i=0;i<nodemap->num;i++) {
2129                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2130                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2131                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2132                 }
2133         }
2134
2135         talloc_free(tmp_ctx);
2136 }
2137
2138
2139 /*
2140   we think we are winning the election - send a broadcast election request
2141  */
2142 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2143 {
2144         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2145         int ret;
2146
2147         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2148         if (ret != 0) {
2149                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2150         }
2151
2152         talloc_free(rec->send_election_te);
2153         rec->send_election_te = NULL;
2154 }
2155
2156 /*
2157   handler for memory dumps
2158 */
2159 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2160                              TDB_DATA data, void *private_data)
2161 {
2162         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2163         TDB_DATA *dump;
2164         int ret;
2165         struct rd_memdump_reply *rd;
2166
2167         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2168                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2169                 talloc_free(tmp_ctx);
2170                 return;
2171         }
2172         rd = (struct rd_memdump_reply *)data.dptr;
2173
2174         dump = talloc_zero(tmp_ctx, TDB_DATA);
2175         if (dump == NULL) {
2176                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2177                 talloc_free(tmp_ctx);
2178                 return;
2179         }
2180         ret = ctdb_dump_memory(ctdb, dump);
2181         if (ret != 0) {
2182                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2183                 talloc_free(tmp_ctx);
2184                 return;
2185         }
2186
2187 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2188
2189         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2190         if (ret != 0) {
2191                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2192                 talloc_free(tmp_ctx);
2193                 return;
2194         }
2195
2196         talloc_free(tmp_ctx);
2197 }
2198
2199 /*
2200   handler for getlog
2201 */
2202 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2203                            TDB_DATA data, void *private_data)
2204 {
2205         struct ctdb_get_log_addr *log_addr;
2206         pid_t child;
2207
2208         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2209                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2210                 return;
2211         }
2212         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2213
2214         child = ctdb_fork_no_free_ringbuffer(ctdb);
2215         if (child == (pid_t)-1) {
2216                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2217                 return;
2218         }
2219
2220         if (child == 0) {
2221                 ctdb_set_process_name("ctdb_rec_log_collector");
2222                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2223                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2224                         _exit(1);
2225                 }
2226                 ctdb_collect_log(ctdb, log_addr);
2227                 _exit(0);
2228         }
2229 }
2230
2231 /*
2232   handler for clearlog
2233 */
2234 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2235                              TDB_DATA data, void *private_data)
2236 {
2237         ctdb_clear_log(ctdb);
2238 }
2239
2240 /*
2241   handler for reload_nodes
2242 */
2243 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2244                              TDB_DATA data, void *private_data)
2245 {
2246         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2247
2248         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2249
2250         reload_nodes_file(rec->ctdb);
2251 }
2252
2253
2254 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2255                               struct timeval yt, void *p)
2256 {
2257         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2258
2259         talloc_free(rec->ip_check_disable_ctx);
2260         rec->ip_check_disable_ctx = NULL;
2261 }
2262
2263
2264 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2265                                   struct timeval t, void *p)
2266 {
2267         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2268
2269         DEBUG(DEBUG_NOTICE,
2270               ("Rebalance all nodes that have had ip assignment changes.\n"));
2271
2272         do_takeover_run(rec, rec->nodemap, false);
2273
2274         talloc_free(rec->deferred_rebalance_ctx);
2275         rec->deferred_rebalance_ctx = NULL;
2276 }
2277
2278         
2279 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2280                              TDB_DATA data, void *private_data)
2281 {
2282         uint32_t pnn;
2283         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2284
2285         if (data.dsize != sizeof(uint32_t)) {
2286                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2287                 return;
2288         }
2289
2290         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2291                 return;
2292         }
2293
2294         pnn = *(uint32_t *)&data.dptr[0];
2295
2296         lcp2_forcerebalance(ctdb, pnn);
2297         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2298
2299         if (rec->deferred_rebalance_ctx != NULL) {
2300                 talloc_free(rec->deferred_rebalance_ctx);
2301         }
2302         rec->deferred_rebalance_ctx = talloc_new(rec);
2303         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2304                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2305                         ctdb_rebalance_timeout, rec);
2306 }
2307
2308
2309
2310 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2311                              TDB_DATA data, void *private_data)
2312 {
2313         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2314         struct ctdb_public_ip *ip;
2315
2316         if (rec->recmaster != rec->ctdb->pnn) {
2317                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2318                 return;
2319         }
2320
2321         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2322                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2323                 return;
2324         }
2325
2326         ip = (struct ctdb_public_ip *)data.dptr;
2327
2328         update_ip_assignment_tree(rec->ctdb, ip);
2329 }
2330
2331
2332 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2333                              TDB_DATA data, void *private_data)
2334 {
2335         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2336         uint32_t timeout;
2337
2338         if (rec->ip_check_disable_ctx != NULL) {
2339                 talloc_free(rec->ip_check_disable_ctx);
2340                 rec->ip_check_disable_ctx = NULL;
2341         }
2342
2343         if (data.dsize != sizeof(uint32_t)) {
2344                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2345                                  "expexting %lu\n", (long unsigned)data.dsize,
2346                                  (long unsigned)sizeof(uint32_t)));
2347                 return;
2348         }
2349         if (data.dptr == NULL) {
2350                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2351                 return;
2352         }
2353
2354         timeout = *((uint32_t *)data.dptr);
2355
2356         if (timeout == 0) {
2357                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2358                 return;
2359         }
2360                 
2361         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2362
2363         rec->ip_check_disable_ctx = talloc_new(rec);
2364         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2365
2366         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2367 }
2368
2369
2370 /*
2371   handler for reload all ips.
2372 */
2373 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2374                              TDB_DATA data, void *private_data)
2375 {
2376         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2377
2378         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2379                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2380                 return;
2381         }
2382
2383         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2384
2385         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2386         return;
2387 }
2388
2389 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2390 {
2391         uint32_t *status = callback_data;
2392
2393         if (res != 0) {
2394                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2395                 *status = 1;
2396         }
2397 }
2398
2399 static int
2400 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2401 {
2402         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2403         uint32_t *nodes;
2404         uint32_t status;
2405         int i;
2406
2407         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2408         for (i = 0; i< nodemap->num; i++) {
2409                 if (nodemap->nodes[i].flags != 0) {
2410                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2411                         talloc_free(tmp_ctx);
2412                         return -1;
2413                 }
2414         }
2415
2416         /* send the flags update to all connected nodes */
2417         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2418         status = 0;
2419         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2420                                         nodes, 0,
2421                                         CONTROL_TIMEOUT(),
2422                                         false, tdb_null,
2423                                         async_reloadips_callback, NULL,
2424                                         &status) != 0) {
2425                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2426                 talloc_free(tmp_ctx);
2427                 return -1;
2428         }
2429
2430         if (status != 0) {
2431                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2432                 talloc_free(tmp_ctx);
2433                 return -1;
2434         }
2435
2436         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2437
2438         talloc_free(tmp_ctx);
2439         return 0;
2440 }
2441
2442
2443 /*
2444   handler for ip reallocate, just add it to the list of callers and 
2445   handle this later in the monitor_cluster loop so we do not recurse
2446   with other callers to takeover_run()
2447 */
2448 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2449                              TDB_DATA data, void *private_data)
2450 {
2451         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2452         struct ip_reallocate_list *caller;
2453
2454         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2455                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2456                 return;
2457         }
2458
2459         if (rec->ip_reallocate_ctx == NULL) {
2460                 rec->ip_reallocate_ctx = talloc_new(rec);
2461                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2462         }
2463
2464         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2465         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2466
2467         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2468         caller->next = rec->reallocate_callers;
2469         rec->reallocate_callers = caller;
2470
2471         return;
2472 }
2473
2474 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2475 {
2476         TDB_DATA result;
2477         int32_t ret;
2478         struct ip_reallocate_list *callers;
2479         uint32_t culprit;
2480
2481         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2482
2483         /* update the list of public ips that a node can handle for
2484            all connected nodes
2485         */
2486         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2487         if (ret != 0) {
2488                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2489                                  culprit));
2490                 rec->need_takeover_run = true;
2491         }
2492         if (ret == 0) {
2493                 if (do_takeover_run(rec, rec->nodemap, false)) {
2494                         ret = 0;
2495                 } else {
2496                         ret = -1;
2497                 }
2498         }
2499
2500         result.dsize = sizeof(int32_t);
2501         result.dptr  = (uint8_t *)&ret;
2502
2503         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2504
2505                 /* Someone that sent srvid==0 does not want a reply */
2506                 if (callers->rd->srvid == 0) {
2507                         continue;
2508                 }
2509                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2510                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2511                                   (unsigned long long)callers->rd->srvid));
2512                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2513                 if (ret != 0) {
2514                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2515                                          "message to %u:%llu\n",
2516                                          (unsigned)callers->rd->pnn,
2517                                          (unsigned long long)callers->rd->srvid));
2518                 }
2519         }
2520
2521         talloc_free(rec->ip_reallocate_ctx);
2522         rec->ip_reallocate_ctx = NULL;
2523         rec->reallocate_callers = NULL;
2524 }
2525
2526
2527 /*
2528   handler for recovery master elections
2529 */
2530 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2531                              TDB_DATA data, void *private_data)
2532 {
2533         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2534         int ret;
2535         struct election_message *em = (struct election_message *)data.dptr;
2536         TALLOC_CTX *mem_ctx;
2537
2538         /* we got an election packet - update the timeout for the election */
2539         talloc_free(rec->election_timeout);
2540         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2541                                                 fast_start ?
2542                                                 timeval_current_ofs(0, 500000) :
2543                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2544                                                 ctdb_election_timeout, rec);
2545
2546         mem_ctx = talloc_new(ctdb);
2547
2548         /* someone called an election. check their election data
2549            and if we disagree and we would rather be the elected node, 
2550            send a new election message to all other nodes
2551          */
2552         if (ctdb_election_win(rec, em)) {
2553                 if (!rec->send_election_te) {
2554                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2555                                                                 timeval_current_ofs(0, 500000),
2556                                                                 election_send_request, rec);
2557                 }
2558                 talloc_free(mem_ctx);
2559                 /*unban_all_nodes(ctdb);*/
2560                 return;
2561         }
2562         
2563         /* we didn't win */
2564         talloc_free(rec->send_election_te);
2565         rec->send_election_te = NULL;
2566
2567         if (ctdb->tunable.verify_recovery_lock != 0) {
2568                 /* release the recmaster lock */
2569                 if (em->pnn != ctdb->pnn &&
2570                     ctdb->recovery_lock_fd != -1) {
2571                         close(ctdb->recovery_lock_fd);
2572                         ctdb->recovery_lock_fd = -1;
2573                         unban_all_nodes(ctdb);
2574                 }
2575         }
2576
2577         /* ok, let that guy become recmaster then */
2578         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2579         if (ret != 0) {
2580                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2581                 talloc_free(mem_ctx);
2582                 return;
2583         }
2584
2585         talloc_free(mem_ctx);
2586         return;
2587 }
2588
2589
2590 /*
2591   force the start of the election process
2592  */
2593 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2594                            struct ctdb_node_map *nodemap)
2595 {
2596         int ret;
2597         struct ctdb_context *ctdb = rec->ctdb;
2598
2599         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2600
2601         /* set all nodes to recovery mode to stop all internode traffic */
2602         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2603         if (ret != 0) {
2604                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2605                 return;
2606         }
2607
2608         talloc_free(rec->election_timeout);
2609         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2610                                                 fast_start ?
2611                                                 timeval_current_ofs(0, 500000) :
2612                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2613                                                 ctdb_election_timeout, rec);
2614
2615         ret = send_election_request(rec, pnn, true);
2616         if (ret!=0) {
2617                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2618                 return;
2619         }
2620
2621         /* wait for a few seconds to collect all responses */
2622         ctdb_wait_election(rec);
2623 }
2624
2625
2626
2627 /*
2628   handler for when a node changes its flags
2629 */
2630 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2631                             TDB_DATA data, void *private_data)
2632 {
2633         int ret;
2634         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2635         struct ctdb_node_map *nodemap=NULL;
2636         TALLOC_CTX *tmp_ctx;
2637         int i;
2638         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2639         int disabled_flag_changed;
2640
2641         if (data.dsize != sizeof(*c)) {
2642                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2643                 return;
2644         }
2645
2646         tmp_ctx = talloc_new(ctdb);
2647         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2648
2649         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2650         if (ret != 0) {
2651                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2652                 talloc_free(tmp_ctx);
2653                 return;         
2654         }
2655
2656
2657         for (i=0;i<nodemap->num;i++) {
2658                 if (nodemap->nodes[i].pnn == c->pnn) break;
2659         }
2660
2661         if (i == nodemap->num) {
2662                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2663                 talloc_free(tmp_ctx);
2664                 return;
2665         }
2666
2667         if (c->old_flags != c->new_flags) {
2668                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2669         }
2670
2671         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2672
2673         nodemap->nodes[i].flags = c->new_flags;
2674
2675         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2676                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2677
2678         if (ret == 0) {
2679                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2680                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2681         }
2682         
2683         if (ret == 0 &&
2684             ctdb->recovery_master == ctdb->pnn &&
2685             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2686                 /* Only do the takeover run if the perm disabled or unhealthy
2687                    flags changed since these will cause an ip failover but not
2688                    a recovery.
2689                    If the node became disconnected or banned this will also
2690                    lead to an ip address failover but that is handled 
2691                    during recovery
2692                 */
2693                 if (disabled_flag_changed) {
2694                         rec->need_takeover_run = true;
2695                 }
2696         }
2697
2698         talloc_free(tmp_ctx);
2699 }
2700
2701 /*
2702   handler for when we need to push out flag changes ot all other nodes
2703 */
2704 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2705                             TDB_DATA data, void *private_data)
2706 {
2707         int ret;
2708         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2709         struct ctdb_node_map *nodemap=NULL;
2710         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2711         uint32_t recmaster;
2712         uint32_t *nodes;
2713
2714         /* find the recovery master */
2715         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2716         if (ret != 0) {
2717                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2718                 talloc_free(tmp_ctx);
2719                 return;
2720         }
2721
2722         /* read the node flags from the recmaster */
2723         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2724         if (ret != 0) {
2725                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2726                 talloc_free(tmp_ctx);
2727                 return;
2728         }
2729         if (c->pnn >= nodemap->num) {
2730                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2731                 talloc_free(tmp_ctx);
2732                 return;
2733         }
2734
2735         /* send the flags update to all connected nodes */
2736         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2737
2738         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2739                                       nodes, 0, CONTROL_TIMEOUT(),
2740                                       false, data,
2741                                       NULL, NULL,
2742                                       NULL) != 0) {
2743                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2744
2745                 talloc_free(tmp_ctx);
2746                 return;
2747         }
2748
2749         talloc_free(tmp_ctx);
2750 }
2751
2752
2753 struct verify_recmode_normal_data {
2754         uint32_t count;
2755         enum monitor_result status;
2756 };
2757
2758 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2759 {
2760         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2761
2762
2763         /* one more node has responded with recmode data*/
2764         rmdata->count--;
2765
2766         /* if we failed to get the recmode, then return an error and let
2767            the main loop try again.
2768         */
2769         if (state->state != CTDB_CONTROL_DONE) {
2770                 if (rmdata->status == MONITOR_OK) {
2771                         rmdata->status = MONITOR_FAILED;
2772                 }
2773                 return;
2774         }
2775
2776         /* if we got a response, then the recmode will be stored in the
2777            status field
2778         */
2779         if (state->status != CTDB_RECOVERY_NORMAL) {
2780                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2781                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2782         }
2783
2784         return;
2785 }
2786
2787
2788 /* verify that all nodes are in normal recovery mode */
2789 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2790 {
2791         struct verify_recmode_normal_data *rmdata;
2792         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2793         struct ctdb_client_control_state *state;
2794         enum monitor_result status;
2795         int j;
2796         
2797         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2798         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2799         rmdata->count  = 0;
2800         rmdata->status = MONITOR_OK;
2801
2802         /* loop over all active nodes and send an async getrecmode call to 
2803            them*/
2804         for (j=0; j<nodemap->num; j++) {
2805                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2806                         continue;
2807                 }
2808                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2809                                         CONTROL_TIMEOUT(), 
2810                                         nodemap->nodes[j].pnn);
2811                 if (state == NULL) {
2812                         /* we failed to send the control, treat this as 
2813                            an error and try again next iteration
2814                         */                      
2815                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2816                         talloc_free(mem_ctx);
2817                         return MONITOR_FAILED;
2818                 }
2819
2820                 /* set up the callback functions */
2821                 state->async.fn = verify_recmode_normal_callback;
2822                 state->async.private_data = rmdata;
2823
2824                 /* one more control to wait for to complete */
2825                 rmdata->count++;
2826         }
2827
2828
2829         /* now wait for up to the maximum number of seconds allowed
2830            or until all nodes we expect a response from has replied
2831         */
2832         while (rmdata->count > 0) {
2833                 event_loop_once(ctdb->ev);
2834         }
2835
2836         status = rmdata->status;
2837         talloc_free(mem_ctx);
2838         return status;
2839 }
2840
2841
2842 struct verify_recmaster_data {
2843         struct ctdb_recoverd *rec;
2844         uint32_t count;
2845         uint32_t pnn;
2846         enum monitor_result status;
2847 };
2848
2849 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2850 {
2851         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2852
2853
2854         /* one more node has responded with recmaster data*/
2855         rmdata->count--;
2856
2857         /* if we failed to get the recmaster, then return an error and let
2858            the main loop try again.
2859         */
2860         if (state->state != CTDB_CONTROL_DONE) {
2861                 if (rmdata->status == MONITOR_OK) {
2862                         rmdata->status = MONITOR_FAILED;
2863                 }
2864                 return;
2865         }
2866
2867         /* if we got a response, then the recmaster will be stored in the
2868            status field
2869         */
2870         if (state->status != rmdata->pnn) {
2871                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2872                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2873                 rmdata->status = MONITOR_ELECTION_NEEDED;
2874         }
2875
2876         return;
2877 }
2878
2879
2880 /* verify that all nodes agree that we are the recmaster */
2881 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2882 {
2883         struct ctdb_context *ctdb = rec->ctdb;
2884         struct verify_recmaster_data *rmdata;
2885         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2886         struct ctdb_client_control_state *state;
2887         enum monitor_result status;
2888         int j;
2889         
2890         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2891         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2892         rmdata->rec    = rec;
2893         rmdata->count  = 0;
2894         rmdata->pnn    = pnn;
2895         rmdata->status = MONITOR_OK;
2896
2897         /* loop over all active nodes and send an async getrecmaster call to 
2898            them*/
2899         for (j=0; j<nodemap->num; j++) {
2900                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2901                         continue;
2902                 }
2903                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2904                                         CONTROL_TIMEOUT(),
2905                                         nodemap->nodes[j].pnn);
2906                 if (state == NULL) {
2907                         /* we failed to send the control, treat this as 
2908                            an error and try again next iteration
2909                         */                      
2910                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2911                         talloc_free(mem_ctx);
2912                         return MONITOR_FAILED;
2913                 }
2914
2915                 /* set up the callback functions */
2916                 state->async.fn = verify_recmaster_callback;
2917                 state->async.private_data = rmdata;
2918
2919                 /* one more control to wait for to complete */
2920                 rmdata->count++;
2921         }
2922
2923
2924         /* now wait for up to the maximum number of seconds allowed
2925            or until all nodes we expect a response from has replied
2926         */
2927         while (rmdata->count > 0) {
2928                 event_loop_once(ctdb->ev);
2929         }
2930
2931         status = rmdata->status;
2932         talloc_free(mem_ctx);
2933         return status;
2934 }
2935
2936 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2937                                     struct ctdb_recoverd *rec)
2938 {
2939         struct ctdb_control_get_ifaces *ifaces = NULL;
2940         TALLOC_CTX *mem_ctx;
2941         bool ret = false;
2942
2943         mem_ctx = talloc_new(NULL);
2944
2945         /* Read the interfaces from the local node */
2946         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2947                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2948                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2949                 /* We could return an error.  However, this will be
2950                  * rare so we'll decide that the interfaces have
2951                  * actually changed, just in case.
2952                  */
2953                 talloc_free(mem_ctx);
2954                 return true;
2955         }
2956
2957         if (!rec->ifaces) {
2958                 /* We haven't been here before so things have changed */
2959                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2960                 ret = true;
2961         } else if (rec->ifaces->num != ifaces->num) {
2962                 /* Number of interfaces has changed */
2963                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2964                                      rec->ifaces->num, ifaces->num));
2965                 ret = true;
2966         } else {
2967                 /* See if interface names or link states have changed */
2968                 int i;
2969                 for (i = 0; i < rec->ifaces->num; i++) {
2970                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2971                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2972                                 DEBUG(DEBUG_NOTICE,
2973                                       ("Interface in slot %d changed: %s => %s\n",
2974                                        i, iface->name, ifaces->ifaces[i].name));
2975                                 ret = true;
2976                                 break;
2977                         }
2978                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2979                                 DEBUG(DEBUG_NOTICE,
2980                                       ("Interface %s changed state: %d => %d\n",
2981                                        iface->name, iface->link_state,
2982                                        ifaces->ifaces[i].link_state));
2983                                 ret = true;
2984                                 break;
2985                         }
2986                 }
2987         }
2988
2989         talloc_free(rec->ifaces);
2990         rec->ifaces = talloc_steal(rec, ifaces);
2991
2992         talloc_free(mem_ctx);
2993         return ret;
2994 }
2995
2996 /* called to check that the local allocation of public ip addresses is ok.
2997 */
2998 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2999 {
3000         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3001         struct ctdb_uptime *uptime1 = NULL;
3002         struct ctdb_uptime *uptime2 = NULL;
3003         int ret, j;
3004         bool need_takeover_run = false;
3005
3006         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3007                                 CTDB_CURRENT_NODE, &uptime1);
3008         if (ret != 0) {
3009                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3010                 talloc_free(mem_ctx);
3011                 return -1;
3012         }
3013
3014         if (interfaces_have_changed(ctdb, rec)) {
3015                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3016                                      "local node %u - force takeover run\n",
3017                                      pnn));
3018                 need_takeover_run = true;
3019         }
3020
3021         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3022                                 CTDB_CURRENT_NODE, &uptime2);
3023         if (ret != 0) {
3024                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3025                 talloc_free(mem_ctx);
3026                 return -1;
3027         }
3028
3029         /* skip the check if the startrecovery time has changed */
3030         if (timeval_compare(&uptime1->last_recovery_started,
3031                             &uptime2->last_recovery_started) != 0) {
3032                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3033                 talloc_free(mem_ctx);
3034                 return 0;
3035         }
3036
3037         /* skip the check if the endrecovery time has changed */
3038         if (timeval_compare(&uptime1->last_recovery_finished,
3039                             &uptime2->last_recovery_finished) != 0) {
3040                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3041                 talloc_free(mem_ctx);
3042                 return 0;
3043         }
3044
3045         /* skip the check if we have started but not finished recovery */
3046         if (timeval_compare(&uptime1->last_recovery_finished,
3047                             &uptime1->last_recovery_started) != 1) {
3048                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3049                 talloc_free(mem_ctx);
3050
3051                 return 0;
3052         }
3053
3054         /* verify that we have the ip addresses we should have
3055            and we dont have ones we shouldnt have.
3056            if we find an inconsistency we set recmode to
3057            active on the local node and wait for the recmaster
3058            to do a full blown recovery.
3059            also if the pnn is -1 and we are healthy and can host the ip
3060            we also request a ip reallocation.
3061         */
3062         if (ctdb->tunable.disable_ip_failover == 0) {
3063                 struct ctdb_all_public_ips *ips = NULL;
3064
3065                 /* read the *available* IPs from the local node */
3066                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3067                 if (ret != 0) {
3068                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3069                         talloc_free(mem_ctx);
3070                         return -1;
3071                 }
3072
3073                 for (j=0; j<ips->num; j++) {
3074                         if (ips->ips[j].pnn == -1 &&
3075                             nodemap->nodes[pnn].flags == 0) {
3076                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3077                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3078                                 need_takeover_run = true;
3079                         }
3080                 }
3081
3082                 talloc_free(ips);
3083
3084                 /* read the *known* IPs from the local node */
3085                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3086                 if (ret != 0) {
3087                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3088                         talloc_free(mem_ctx);
3089                         return -1;
3090                 }
3091
3092                 for (j=0; j<ips->num; j++) {
3093                         if (ips->ips[j].pnn == pnn) {
3094                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3095                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3096                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3097                                         need_takeover_run = true;
3098                                 }
3099                         } else {
3100                                 if (ctdb->do_checkpublicip &&
3101                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3102
3103                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3104                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3105
3106                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3107                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3108                                         }
3109                                 }
3110                         }
3111                 }
3112         }
3113
3114         if (need_takeover_run) {
3115                 struct takeover_run_reply rd;
3116                 TDB_DATA data;
3117
3118                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3119
3120                 rd.pnn = ctdb->pnn;
3121                 rd.srvid = 0;
3122                 data.dptr = (uint8_t *)&rd;
3123                 data.dsize = sizeof(rd);
3124
3125                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3126                 if (ret != 0) {
3127                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3128                 }
3129         }
3130         talloc_free(mem_ctx);
3131         return 0;
3132 }
3133
3134
3135 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3136 {
3137         struct ctdb_node_map **remote_nodemaps = callback_data;
3138
3139         if (node_pnn >= ctdb->num_nodes) {
3140                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3141                 return;
3142         }
3143
3144         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3145
3146 }
3147
3148 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3149         struct ctdb_node_map *nodemap,
3150         struct ctdb_node_map **remote_nodemaps)
3151 {
3152         uint32_t *nodes;
3153
3154         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3155         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3156                                         nodes, 0,
3157                                         CONTROL_TIMEOUT(), false, tdb_null,
3158                                         async_getnodemap_callback,
3159                                         NULL,
3160                                         remote_nodemaps) != 0) {
3161                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3162
3163                 return -1;
3164         }
3165
3166         return 0;
3167 }
3168
3169 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3170 struct ctdb_check_reclock_state {
3171         struct ctdb_context *ctdb;
3172         struct timeval start_time;
3173         int fd[2];
3174         pid_t child;
3175         struct timed_event *te;
3176         struct fd_event *fde;
3177         enum reclock_child_status status;
3178 };
3179
3180 /* when we free the reclock state we must kill any child process.
3181 */
3182 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3183 {
3184         struct ctdb_context *ctdb = state->ctdb;
3185
3186         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3187
3188         if (state->fd[0] != -1) {
3189                 close(state->fd[0]);
3190                 state->fd[0] = -1;
3191         }
3192         if (state->fd[1] != -1) {
3193                 close(state->fd[1]);
3194                 state->fd[1] = -1;
3195         }
3196         ctdb_kill(ctdb, state->child, SIGKILL);
3197         return 0;
3198 }
3199
3200 /*
3201   called if our check_reclock child times out. this would happen if
3202   i/o to the reclock file blocks.
3203  */
3204 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3205                                          struct timeval t, void *private_data)
3206 {
3207         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3208                                            struct ctdb_check_reclock_state);
3209
3210         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3211         state->status = RECLOCK_TIMEOUT;
3212 }
3213
3214 /* this is called when the child process has completed checking the reclock
3215    file and has written data back to us through the pipe.
3216 */
3217 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3218                              uint16_t flags, void *private_data)
3219 {
3220         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3221                                              struct ctdb_check_reclock_state);
3222         char c = 0;
3223         int ret;
3224
3225         /* we got a response from our child process so we can abort the
3226            timeout.
3227         */
3228         talloc_free(state->te);
3229         state->te = NULL;
3230
3231         ret = read(state->fd[0], &c, 1);
3232         if (ret != 1 || c != RECLOCK_OK) {
3233                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3234                 state->status = RECLOCK_FAILED;
3235
3236                 return;
3237         }
3238
3239         state->status = RECLOCK_OK;
3240         return;
3241 }
3242
3243 static int check_recovery_lock(struct ctdb_context *ctdb)
3244 {
3245         int ret;
3246         struct ctdb_check_reclock_state *state;
3247         pid_t parent = getpid();
3248
3249         if (ctdb->recovery_lock_fd == -1) {
3250                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3251                 return -1;
3252         }
3253
3254         state = talloc(ctdb, struct ctdb_check_reclock_state);
3255         CTDB_NO_MEMORY(ctdb, state);
3256
3257         state->ctdb = ctdb;
3258         state->start_time = timeval_current();
3259         state->status = RECLOCK_CHECKING;
3260         state->fd[0] = -1;
3261         state->fd[1] = -1;
3262
3263         ret = pipe(state->fd);
3264         if (ret != 0) {
3265                 talloc_free(state);
3266                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3267                 return -1;
3268         }
3269
3270         state->child = ctdb_fork(ctdb);
3271         if (state->child == (pid_t)-1) {
3272                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3273                 close(state->fd[0]);
3274                 state->fd[0] = -1;
3275                 close(state->fd[1]);
3276                 state->fd[1] = -1;
3277                 talloc_free(state);
3278                 return -1;
3279         }
3280
3281         if (state->child == 0) {
3282                 char cc = RECLOCK_OK;
3283                 close(state->fd[0]);
3284                 state->fd[0] = -1;
3285
3286                 ctdb_set_process_name("ctdb_rec_reclock");
3287                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3288                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3289                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3290                         cc = RECLOCK_FAILED;
3291                 }
3292
3293                 write(state->fd[1], &cc, 1);
3294                 /* make sure we die when our parent dies */
3295                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3296                         sleep(5);
3297                 }
3298                 _exit(0);
3299         }
3300         close(state->fd[1]);
3301         state->fd[1] = -1;
3302         set_close_on_exec(state->fd[0]);
3303
3304         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3305
3306         talloc_set_destructor(state, check_reclock_destructor);
3307
3308         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3309                                     ctdb_check_reclock_timeout, state);
3310         if (state->te == NULL) {
3311                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3312                 talloc_free(state);
3313                 return -1;
3314         }
3315
3316         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3317                                 EVENT_FD_READ,
3318                                 reclock_child_handler,
3319                                 (void *)state);
3320
3321         if (state->fde == NULL) {
3322                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3323                 talloc_free(state);
3324                 return -1;
3325         }
3326         tevent_fd_set_auto_close(state->fde);
3327
3328         while (state->status == RECLOCK_CHECKING) {
3329                 event_loop_once(ctdb->ev);
3330         }
3331
3332         if (state->status == RECLOCK_FAILED) {
3333                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3334                 close(ctdb->recovery_lock_fd);
3335                 ctdb->recovery_lock_fd = -1;
3336                 talloc_free(state);
3337                 return -1;
3338         }
3339
3340         talloc_free(state);
3341         return 0;
3342 }
3343
3344 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3345 {
3346         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3347         const char *reclockfile;
3348
3349         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3350                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3351                 talloc_free(tmp_ctx);
3352                 return -1;      
3353         }
3354
3355         if (reclockfile == NULL) {
3356                 if (ctdb->recovery_lock_file != NULL) {
3357                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3358                         talloc_free(ctdb->recovery_lock_file);
3359                         ctdb->recovery_lock_file = NULL;
3360                         if (ctdb->recovery_lock_fd != -1) {
3361                                 close(ctdb->recovery_lock_fd);
3362                                 ctdb->recovery_lock_fd = -1;
3363                         }
3364                 }
3365                 ctdb->tunable.verify_recovery_lock = 0;
3366                 talloc_free(tmp_ctx);
3367                 return 0;
3368         }
3369
3370         if (ctdb->recovery_lock_file == NULL) {
3371                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3372                 if (ctdb->recovery_lock_fd != -1) {
3373                         close(ctdb->recovery_lock_fd);
3374                         ctdb->recovery_lock_fd = -1;
3375                 }
3376                 talloc_free(tmp_ctx);
3377                 return 0;
3378         }
3379
3380
3381         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3382                 talloc_free(tmp_ctx);
3383                 return 0;
3384         }
3385
3386         talloc_free(ctdb->recovery_lock_file);
3387         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3388         ctdb->tunable.verify_recovery_lock = 0;
3389         if (ctdb->recovery_lock_fd != -1) {
3390                 close(ctdb->recovery_lock_fd);
3391                 ctdb->recovery_lock_fd = -1;
3392         }
3393
3394         talloc_free(tmp_ctx);
3395         return 0;
3396 }
3397
3398 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3399                       TALLOC_CTX *mem_ctx)
3400 {
3401         uint32_t pnn;
3402         struct ctdb_node_map *nodemap=NULL;
3403         struct ctdb_node_map *recmaster_nodemap=NULL;
3404         struct ctdb_node_map **remote_nodemaps=NULL;
3405         struct ctdb_vnn_map *vnnmap=NULL;
3406         struct ctdb_vnn_map *remote_vnnmap=NULL;
3407         int32_t debug_level;
3408         int i, j, ret;
3409         bool self_ban;
3410
3411
3412         /* verify that the main daemon is still running */
3413         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3414                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3415                 exit(-1);
3416         }
3417
3418         /* ping the local daemon to tell it we are alive */
3419         ctdb_ctrl_recd_ping(ctdb);
3420
3421         if (rec->election_timeout) {
3422                 /* an election is in progress */
3423                 return;
3424         }
3425
3426         /* read the debug level from the parent and update locally */
3427         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3428         if (ret !=0) {
3429                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3430                 return;
3431         }
3432         LogLevel = debug_level;
3433
3434         /* get relevant tunables */
3435         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3436         if (ret != 0) {
3437                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3438                 return;
3439         }
3440
3441         /* get the current recovery lock file from the server */
3442         if (update_recovery_lock_file(ctdb) != 0) {
3443                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3444                 return;
3445         }
3446
3447         /* Make sure that if recovery lock verification becomes disabled when
3448            we close the file
3449         */
3450         if (ctdb->tunable.verify_recovery_lock == 0) {
3451                 if (ctdb->recovery_lock_fd != -1) {
3452                         close(ctdb->recovery_lock_fd);
3453                         ctdb->recovery_lock_fd = -1;
3454                 }
3455         }
3456
3457         pnn = ctdb_get_pnn(ctdb);
3458
3459         /* get the vnnmap */
3460         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3461         if (ret != 0) {
3462                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3463                 return;
3464         }
3465
3466
3467         /* get number of nodes */
3468         if (rec->nodemap) {
3469                 talloc_free(rec->nodemap);
3470                 rec->nodemap = NULL;
3471                 nodemap=NULL;
3472         }
3473         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3474         if (ret != 0) {
3475                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3476                 return;
3477         }
3478         nodemap = rec->nodemap;
3479
3480         /* remember our own node flags */
3481         rec->node_flags = nodemap->nodes[pnn].flags;
3482
3483         ban_misbehaving_nodes(rec, &self_ban);
3484         if (self_ban) {
3485                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3486                 return;
3487         }
3488
3489         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3490            also frozen and that the recmode is set to active.
3491         */
3492         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3493                 /* If this node has become inactive then we want to
3494                  * reduce the chances of it taking over the recovery
3495                  * master role when it becomes active again.  This
3496                  * helps to stabilise the recovery master role so that
3497                  * it stays on the most stable node.
3498                  */
3499                 rec->priority_time = timeval_current();
3500
3501                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3502                 if (ret != 0) {
3503                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3504                 }
3505                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3506                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3507
3508                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3509                         if (ret != 0) {
3510                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3511                                 return;
3512                         }
3513                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3514                         if (ret != 0) {
3515                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3516
3517                                 return;
3518                         }
3519                 }
3520
3521                 /* If this node is stopped or banned then it is not the recovery
3522                  * master, so don't do anything. This prevents stopped or banned
3523                  * node from starting election and sending unnecessary controls.
3524                  */
3525                 return;
3526         }
3527
3528         /* check which node is the recovery master */
3529         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3530         if (ret != 0) {
3531                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3532                 return;
3533         }
3534
3535         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3536         if (rec->recmaster != pnn) {
3537                 if (rec->ip_reallocate_ctx != NULL) {
3538                         talloc_free(rec->ip_reallocate_ctx);
3539                         rec->ip_reallocate_ctx = NULL;
3540                         rec->reallocate_callers = NULL;
3541                 }
3542         }
3543
3544         /* This is a special case.  When recovery daemon is started, recmaster
3545          * is set to -1.  If a node is not started in stopped state, then
3546          * start election to decide recovery master
3547          */
3548         if (rec->recmaster == (uint32_t)-1) {
3549                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3550                 force_election(rec, pnn, nodemap);
3551                 return;
3552         }
3553
3554         /* update the capabilities for all nodes */
3555         ret = update_capabilities(ctdb, nodemap);
3556         if (ret != 0) {
3557                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3558                 return;
3559         }
3560
3561         /*
3562          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3563          * but we have, then force an election and try to become the new
3564          * recmaster.
3565          */
3566         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3567             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3568              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3569                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3570                                   " but we (node %u) have - force an election\n",
3571                                   rec->recmaster, pnn));
3572                 force_election(rec, pnn, nodemap);
3573                 return;
3574         }
3575
3576         /* count how many active nodes there are */
3577         rec->num_active    = 0;
3578         rec->num_connected = 0;
3579         for (i=0; i<nodemap->num; i++) {
3580                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3581                         rec->num_active++;
3582                 }
3583                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3584                         rec->num_connected++;
3585                 }
3586         }
3587
3588
3589         /* verify that the recmaster node is still active */
3590         for (j=0; j<nodemap->num; j++) {
3591                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3592                         break;
3593                 }
3594         }
3595
3596         if (j == nodemap->num) {
3597                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3598                 force_election(rec, pnn, nodemap);
3599                 return;
3600         }
3601
3602         /* if recovery master is disconnected we must elect a new recmaster */
3603         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3604                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3605                 force_election(rec, pnn, nodemap);
3606                 return;
3607         }
3608
3609         /* get nodemap from the recovery master to check if it is inactive */
3610         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3611                                    mem_ctx, &recmaster_nodemap);
3612         if (ret != 0) {
3613                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3614                           nodemap->nodes[j].pnn));
3615                 return;
3616         }
3617
3618
3619         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3620             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3621                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3622                 /*
3623                  * update our nodemap to carry the recmaster's notion of
3624                  * its own flags, so that we don't keep freezing the
3625                  * inactive recmaster node...
3626                  */
3627                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3628                 force_election(rec, pnn, nodemap);
3629                 return;
3630         }
3631
3632         /* verify that we have all ip addresses we should have and we dont
3633          * have addresses we shouldnt have.
3634          */ 
3635         if (ctdb->tunable.disable_ip_failover == 0) {
3636                 if (rec->ip_check_disable_ctx == NULL) {
3637                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3638                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3639                         }
3640                 }
3641         }
3642
3643
3644         /* if we are not the recmaster then we do not need to check
3645            if recovery is needed
3646          */
3647         if (pnn != rec->recmaster) {
3648                 return;
3649         }
3650
3651
3652         /* ensure our local copies of flags are right */
3653         ret = update_local_flags(rec, nodemap);
3654         if (ret == MONITOR_ELECTION_NEEDED) {
3655                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3656                 force_election(rec, pnn, nodemap);
3657                 return;
3658         }
3659         if (ret != MONITOR_OK) {
3660                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3661                 return;
3662         }
3663
3664         if (ctdb->num_nodes != nodemap->num) {
3665                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3666                 reload_nodes_file(ctdb);
3667                 return;
3668         }
3669
3670         /* verify that all active nodes agree that we are the recmaster */
3671         switch (verify_recmaster(rec, nodemap, pnn)) {
3672         case MONITOR_RECOVERY_NEEDED:
3673                 /* can not happen */
3674                 return;
3675         case MONITOR_ELECTION_NEEDED:
3676                 force_election(rec, pnn, nodemap);
3677                 return;
3678         case MONITOR_OK:
3679                 break;
3680         case MONITOR_FAILED:
3681                 return;
3682         }
3683
3684
3685         if (rec->need_recovery) {
3686                 /* a previous recovery didn't finish */
3687                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3688                 return;
3689         }
3690
3691         /* verify that all active nodes are in normal mode 
3692            and not in recovery mode 
3693         */
3694         switch (verify_recmode(ctdb, nodemap)) {
3695         case MONITOR_RECOVERY_NEEDED:
3696                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3697                 return;
3698         case MONITOR_FAILED:
3699                 return;
3700         case MONITOR_ELECTION_NEEDED:
3701                 /* can not happen */
3702         case MONITOR_OK:
3703                 break;
3704         }
3705
3706
3707         if (ctdb->tunable.verify_recovery_lock != 0) {
3708                 /* we should have the reclock - check its not stale */
3709                 ret = check_recovery_lock(ctdb);
3710                 if (ret != 0) {
3711                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3712                         ctdb_set_culprit(rec, ctdb->pnn);
3713                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3714                         return;
3715                 }
3716         }
3717
3718
3719         /* is there a pending reload all ips ? */
3720         if (reload_all_ips_request != NULL) {
3721                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3722                 talloc_free(reload_all_ips_request);
3723                 reload_all_ips_request = NULL;
3724         }
3725
3726         /* if there are takeovers requested, perform it and notify the waiters */
3727         if (rec->reallocate_callers) {
3728                 process_ipreallocate_requests(ctdb, rec);
3729         }
3730
3731         /* get the nodemap for all active remote nodes
3732          */
3733         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3734         if (remote_nodemaps == NULL) {
3735                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3736                 return;
3737         }
3738         for(i=0; i<nodemap->num; i++) {
3739                 remote_nodemaps[i] = NULL;
3740         }
3741         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3742                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3743                 return;
3744         } 
3745
3746         /* verify that all other nodes have the same nodemap as we have
3747         */
3748         for (j=0; j<nodemap->num; j++) {
3749                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3750                         continue;
3751                 }
3752
3753                 if (remote_nodemaps[j] == NULL) {
3754                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3755                         ctdb_set_culprit(rec, j);
3756
3757                         return;
3758                 }
3759
3760                 /* if the nodes disagree on how many nodes there are
3761                    then this is a good reason to try recovery
3762                  */
3763                 if (remote_nodemaps[j]->num != nodemap->num) {
3764                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3765                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3766                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3767                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3768                         return;
3769                 }
3770
3771                 /* if the nodes disagree on which nodes exist and are
3772                    active, then that is also a good reason to do recovery
3773                  */
3774                 for (i=0;i<nodemap->num;i++) {
3775                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3776                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3777                                           nodemap->nodes[j].pnn, i, 
3778                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3779                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3780                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3781                                             vnnmap);
3782                                 return;
3783                         }
3784                 }
3785         }
3786
3787         /*
3788          * Update node flags obtained from each active node. This ensure we have
3789          * up-to-date information for all the nodes.
3790          */
3791         for (j=0; j<nodemap->num; j++) {
3792                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3793                         continue;
3794                 }
3795                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3796         }
3797
3798         for (j=0; j<nodemap->num; j++) {
3799                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3800                         continue;
3801                 }
3802
3803                 /* verify the flags are consistent
3804                 */
3805                 for (i=0; i<nodemap->num; i++) {
3806                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3807                                 continue;
3808                         }
3809                         
3810                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3811                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3812                                   nodemap->nodes[j].pnn, 
3813                                   nodemap->nodes[i].pnn, 
3814                                   remote_nodemaps[j]->nodes[i].flags,
3815                                   nodemap->nodes[i].flags));
3816                                 if (i == j) {
3817                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3818                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3819                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3820                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3821                                                     vnnmap);
3822                                         return;
3823                                 } else {
3824                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3825                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3826                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3827                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3828                                                     vnnmap);
3829                                         return;
3830                                 }
3831                         }
3832                 }
3833         }
3834
3835
3836         /* there better be the same number of lmasters in the vnn map
3837            as there are active nodes or we will have to do a recovery
3838          */
3839         if (vnnmap->size != rec->num_active) {
3840                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3841                           vnnmap->size, rec->num_active));
3842                 ctdb_set_culprit(rec, ctdb->pnn);
3843                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3844                 return;
3845         }
3846
3847         /* verify that all active nodes in the nodemap also exist in 
3848            the vnnmap.
3849          */
3850         for (j=0; j<nodemap->num; j++) {
3851                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3852                         continue;
3853                 }
3854                 if (nodemap->nodes[j].pnn == pnn) {
3855                         continue;
3856                 }
3857
3858                 for (i=0; i<vnnmap->size; i++) {
3859                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3860                                 break;
3861                         }
3862                 }
3863                 if (i == vnnmap->size) {
3864                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3865                                   nodemap->nodes[j].pnn));
3866                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3867                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3868                         return;
3869                 }
3870         }
3871
3872         
3873         /* verify that all other nodes have the same vnnmap
3874            and are from the same generation
3875          */
3876         for (j=0; j<nodemap->num; j++) {
3877                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3878                         continue;
3879                 }
3880                 if (nodemap->nodes[j].pnn == pnn) {
3881                         continue;
3882                 }
3883
3884                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3885                                           mem_ctx, &remote_vnnmap);
3886                 if (ret != 0) {
3887                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3888                                   nodemap->nodes[j].pnn));
3889                         return;
3890                 }
3891
3892                 /* verify the vnnmap generation is the same */
3893                 if (vnnmap->generation != remote_vnnmap->generation) {
3894                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3895                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3896                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3897                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3898                         return;
3899                 }
3900
3901                 /* verify the vnnmap size is the same */
3902                 if (vnnmap->size != remote_vnnmap->size) {
3903                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3904                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3905                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3906                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3907                         return;
3908                 }
3909
3910                 /* verify the vnnmap is the same */
3911                 for (i=0;i<vnnmap->size;i++) {
3912                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3913                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3914                                           nodemap->nodes[j].pnn));
3915                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3916                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3917                                             vnnmap);
3918                                 return;
3919                         }
3920                 }
3921         }
3922
3923         /* we might need to change who has what IP assigned */
3924         if (rec->need_takeover_run) {
3925                 uint32_t culprit = (uint32_t)-1;
3926
3927                 rec->need_takeover_run = false;
3928
3929                 /* update the list of public ips that a node can handle for
3930                    all connected nodes
3931                 */
3932                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3933                 if (ret != 0) {
3934                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3935                                          culprit));
3936                         rec->need_takeover_run = true;
3937                         return;
3938                 }
3939
3940                 /* execute the "startrecovery" event script on all nodes */
3941                 ret = run_startrecovery_eventscript(rec, nodemap);
3942                 if (ret!=0) {
3943                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3944                         ctdb_set_culprit(rec, ctdb->pnn);
3945                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3946                         return;
3947                 }
3948
3949                 /* If takeover run fails, then the offending nodes are
3950                  * assigned ban culprit counts. And we re-try takeover.
3951                  * If takeover run fails repeatedly, the node would get
3952                  * banned.
3953                  *
3954                  * If rec->need_takeover_run is not set to true at this
3955                  * failure, monitoring is disabled cluster-wide (via
3956                  * startrecovery eventscript) and will not get enabled.
3957                  */
3958                 if (!do_takeover_run(rec, nodemap, true)) {
3959                         return;
3960                 }
3961
3962                 /* execute the "recovered" event script on all nodes */
3963                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3964 #if 0
3965 // we cant check whether the event completed successfully
3966 // since this script WILL fail if the node is in recovery mode
3967 // and if that race happens, the code here would just cause a second
3968 // cascading recovery.
3969                 if (ret!=0) {
3970                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3971                         ctdb_set_culprit(rec, ctdb->pnn);
3972                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3973                 }
3974 #endif
3975         }
3976 }
3977
3978 /*
3979   the main monitoring loop
3980  */
3981 static void monitor_cluster(struct ctdb_context *ctdb)
3982 {
3983         struct ctdb_recoverd *rec;
3984
3985         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3986
3987         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3988         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3989
3990         rec->ctdb = ctdb;
3991
3992         rec->takeover_run_in_progress = false;
3993
3994         rec->priority_time = timeval_current();
3995
3996         /* register a message port for sending memory dumps */
3997         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3998
3999         /* register a message port for requesting logs */
4000         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4001
4002         /* register a message port for clearing logs */
4003         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4004
4005         /* register a message port for recovery elections */
4006         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4007
4008         /* when nodes are disabled/enabled */
4009         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4010
4011         /* when we are asked to puch out a flag change */
4012         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4013
4014         /* register a message port for vacuum fetch */
4015         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4016
4017         /* register a message port for reloadnodes  */
4018         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4019
4020         /* register a message port for performing a takeover run */
4021         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4022
4023         /* register a message port for performing a reload all ips */
4024         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
4025
4026         /* register a message port for disabling the ip check for a short while */
4027         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4028
4029         /* register a message port for updating the recovery daemons node assignment for an ip */
4030         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4031
4032         /* register a message port for forcing a rebalance of a node next
4033            reallocation */
4034         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4035
4036         for (;;) {
4037                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4038                 struct timeval start;
4039                 double elapsed;
4040
4041                 if (!mem_ctx) {
4042                         DEBUG(DEBUG_CRIT,(__location__
4043                                           " Failed to create temp context\n"));
4044                         exit(-1);
4045                 }
4046
4047                 start = timeval_current();
4048                 main_loop(ctdb, rec, mem_ctx);
4049                 talloc_free(mem_ctx);
4050
4051                 /* we only check for recovery once every second */
4052                 elapsed = timeval_elapsed(&start);
4053                 if (elapsed < ctdb->tunable.recover_interval) {
4054                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4055                                           - elapsed);
4056                 }
4057         }
4058 }
4059
4060 /*
4061   event handler for when the main ctdbd dies
4062  */
4063 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4064                                  uint16_t flags, void *private_data)
4065 {
4066         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4067         _exit(1);
4068 }
4069
4070 /*
4071   called regularly to verify that the recovery daemon is still running
4072  */
4073 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4074                               struct timeval yt, void *p)
4075 {
4076         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4077
4078         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4079                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4080
4081                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4082                                 ctdb_restart_recd, ctdb);
4083
4084                 return;
4085         }
4086
4087         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4088                         timeval_current_ofs(30, 0),
4089                         ctdb_check_recd, ctdb);
4090 }
4091
4092 static void recd_sig_child_handler(struct event_context *ev,
4093         struct signal_event *se, int signum, int count,
4094         void *dont_care, 
4095         void *private_data)
4096 {
4097 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4098         int status;
4099         pid_t pid = -1;
4100
4101         while (pid != 0) {
4102                 pid = waitpid(-1, &status, WNOHANG);
4103                 if (pid == -1) {
4104                         if (errno != ECHILD) {
4105                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4106                         }
4107                         return;
4108                 }
4109                 if (pid > 0) {
4110                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4111                 }
4112         }
4113 }
4114
4115 /*
4116   startup the recovery daemon as a child of the main ctdb daemon
4117  */
4118 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4119 {
4120         int fd[2];
4121         struct signal_event *se;
4122         struct tevent_fd *fde;
4123
4124         if (pipe(fd) != 0) {
4125                 return -1;
4126         }
4127
4128         ctdb->ctdbd_pid = getpid();
4129
4130         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4131         if (ctdb->recoverd_pid == -1) {
4132                 return -1;
4133         }
4134
4135         if (ctdb->recoverd_pid != 0) {
4136                 talloc_free(ctdb->recd_ctx);
4137                 ctdb->recd_ctx = talloc_new(ctdb);
4138                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4139
4140                 close(fd[0]);
4141                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4142                                 timeval_current_ofs(30, 0),
4143                                 ctdb_check_recd, ctdb);
4144                 return 0;
4145         }
4146
4147         close(fd[1]);
4148
4149         srandom(getpid() ^ time(NULL));
4150
4151         /* Clear the log ringbuffer */
4152         ctdb_clear_log(ctdb);
4153
4154         ctdb_set_process_name("ctdb_recovered");
4155         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4156                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4157                 exit(1);
4158         }
4159
4160         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4161
4162         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4163                      ctdb_recoverd_parent, &fd[0]);
4164         tevent_fd_set_auto_close(fde);
4165
4166         /* set up a handler to pick up sigchld */
4167         se = event_add_signal(ctdb->ev, ctdb,
4168                                      SIGCHLD, 0,
4169                                      recd_sig_child_handler,
4170                                      ctdb);
4171         if (se == NULL) {
4172                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4173                 exit(1);
4174         }
4175
4176         monitor_cluster(ctdb);
4177
4178         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4179         return -1;
4180 }
4181
4182 /*
4183   shutdown the recovery daemon
4184  */
4185 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4186 {
4187         if (ctdb->recoverd_pid == 0) {
4188                 return;
4189         }
4190
4191         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4192         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4193
4194         TALLOC_FREE(ctdb->recd_ctx);
4195         TALLOC_FREE(ctdb->recd_ping_count);
4196 }
4197
4198 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4199                        struct timeval t, void *private_data)
4200 {
4201         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4202
4203         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4204         ctdb_stop_recoverd(ctdb);
4205         ctdb_start_recoverd(ctdb);
4206 }