Revert "if a new node enters the cluster, that node will already be frozen at start"
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_lmasters;
133         uint32_t num_connected;
134         uint32_t last_culprit_node;
135         struct ctdb_node_map *nodemap;
136         struct timeval priority_time;
137         bool need_takeover_run;
138         bool need_recovery;
139         uint32_t node_flags;
140         struct timed_event *send_election_te;
141         struct timed_event *election_timeout;
142         struct vacuum_info *vacuum_info;
143         struct srvid_requests *reallocate_requests;
144         bool takeover_run_in_progress;
145         TALLOC_CTX *takeover_runs_disable_ctx;
146         struct ctdb_control_get_ifaces *ifaces;
147         uint32_t *force_rebalance_nodes;
148 };
149
150 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
151 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
152
153 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
154
155 /*
156   ban a node for a period of time
157  */
158 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
159 {
160         int ret;
161         struct ctdb_context *ctdb = rec->ctdb;
162         struct ctdb_ban_time bantime;
163        
164         if (!ctdb_validate_pnn(ctdb, pnn)) {
165                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
166                 return;
167         }
168
169         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
170
171         bantime.pnn  = pnn;
172         bantime.time = ban_time;
173
174         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
175         if (ret != 0) {
176                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
177                 return;
178         }
179
180 }
181
182 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
183
184
185 /*
186   remember the trouble maker
187  */
188 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
191         struct ctdb_banning_state *ban_state;
192
193         if (culprit > ctdb->num_nodes) {
194                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
195                 return;
196         }
197
198         /* If we are banned or stopped, do not set other nodes as culprits */
199         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
200                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
201                 return;
202         }
203
204         if (ctdb->nodes[culprit]->ban_state == NULL) {
205                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
206                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
207
208                 
209         }
210         ban_state = ctdb->nodes[culprit]->ban_state;
211         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
212                 /* this was the first time in a long while this node
213                    misbehaved so we will forgive any old transgressions.
214                 */
215                 ban_state->count = 0;
216         }
217
218         ban_state->count += count;
219         ban_state->last_reported_time = timeval_current();
220         rec->last_culprit_node = culprit;
221 }
222
223 /*
224   remember the trouble maker
225  */
226 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
227 {
228         ctdb_set_culprit_count(rec, culprit, 1);
229 }
230
231
232 /* this callback is called for every node that failed to execute the
233    recovered event
234 */
235 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
236 {
237         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
238
239         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
240
241         ctdb_set_culprit(rec, node_pnn);
242 }
243
244 /*
245   run the "recovered" eventscript on all nodes
246  */
247 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
248 {
249         TALLOC_CTX *tmp_ctx;
250         uint32_t *nodes;
251         struct ctdb_context *ctdb = rec->ctdb;
252
253         tmp_ctx = talloc_new(ctdb);
254         CTDB_NO_MEMORY(ctdb, tmp_ctx);
255
256         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
257         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
258                                         nodes, 0,
259                                         CONTROL_TIMEOUT(), false, tdb_null,
260                                         NULL, recovered_fail_callback,
261                                         rec) != 0) {
262                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
263
264                 talloc_free(tmp_ctx);
265                 return -1;
266         }
267
268         talloc_free(tmp_ctx);
269         return 0;
270 }
271
272 /* this callback is called for every node that failed to execute the
273    start recovery event
274 */
275 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
276 {
277         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
278
279         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
280
281         ctdb_set_culprit(rec, node_pnn);
282 }
283
284 /*
285   run the "startrecovery" eventscript on all nodes
286  */
287 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
288 {
289         TALLOC_CTX *tmp_ctx;
290         uint32_t *nodes;
291         struct ctdb_context *ctdb = rec->ctdb;
292
293         tmp_ctx = talloc_new(ctdb);
294         CTDB_NO_MEMORY(ctdb, tmp_ctx);
295
296         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
298                                         nodes, 0,
299                                         CONTROL_TIMEOUT(), false, tdb_null,
300                                         NULL,
301                                         startrecovery_fail_callback,
302                                         rec) != 0) {
303                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
304                 talloc_free(tmp_ctx);
305                 return -1;
306         }
307
308         talloc_free(tmp_ctx);
309         return 0;
310 }
311
312 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
313 {
314         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
315                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
316                 return;
317         }
318         if (node_pnn < ctdb->num_nodes) {
319                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
320         }
321
322         if (node_pnn == ctdb->pnn) {
323                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
324         }
325 }
326
327 /*
328   update the node capabilities for all connected nodes
329  */
330 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
331 {
332         uint32_t *nodes;
333         TALLOC_CTX *tmp_ctx;
334
335         tmp_ctx = talloc_new(ctdb);
336         CTDB_NO_MEMORY(ctdb, tmp_ctx);
337
338         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
340                                         nodes, 0,
341                                         CONTROL_TIMEOUT(),
342                                         false, tdb_null,
343                                         async_getcap_callback, NULL,
344                                         NULL) != 0) {
345                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
346                 talloc_free(tmp_ctx);
347                 return -1;
348         }
349
350         talloc_free(tmp_ctx);
351         return 0;
352 }
353
354 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
355 {
356         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
357
358         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
359         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
360 }
361
362 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
363 {
364         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
365
366         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
367         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
368 }
369
370 /*
371   change recovery mode on all nodes
372  */
373 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
374 {
375         TDB_DATA data;
376         uint32_t *nodes;
377         TALLOC_CTX *tmp_ctx;
378
379         tmp_ctx = talloc_new(ctdb);
380         CTDB_NO_MEMORY(ctdb, tmp_ctx);
381
382         /* freeze all nodes */
383         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
384         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
385                 int i;
386
387                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
388                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
389                                                 nodes, i,
390                                                 CONTROL_TIMEOUT(),
391                                                 false, tdb_null,
392                                                 NULL,
393                                                 set_recmode_fail_callback,
394                                                 rec) != 0) {
395                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
396                                 talloc_free(tmp_ctx);
397                                 return -1;
398                         }
399                 }
400         }
401
402
403         data.dsize = sizeof(uint32_t);
404         data.dptr = (unsigned char *)&rec_mode;
405
406         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
407                                         nodes, 0,
408                                         CONTROL_TIMEOUT(),
409                                         false, data,
410                                         NULL, NULL,
411                                         NULL) != 0) {
412                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
413                 talloc_free(tmp_ctx);
414                 return -1;
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   change recovery master on all node
423  */
424 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
425 {
426         TDB_DATA data;
427         TALLOC_CTX *tmp_ctx;
428         uint32_t *nodes;
429
430         tmp_ctx = talloc_new(ctdb);
431         CTDB_NO_MEMORY(ctdb, tmp_ctx);
432
433         data.dsize = sizeof(uint32_t);
434         data.dptr = (unsigned char *)&pnn;
435
436         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
437         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
438                                         nodes, 0,
439                                         CONTROL_TIMEOUT(), false, data,
440                                         NULL, NULL,
441                                         NULL) != 0) {
442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
443                 talloc_free(tmp_ctx);
444                 return -1;
445         }
446
447         talloc_free(tmp_ctx);
448         return 0;
449 }
450
451 /* update all remote nodes to use the same db priority that we have
452    this can fail if the remove node has not yet been upgraded to 
453    support this function, so we always return success and never fail
454    a recovery if this call fails.
455 */
456 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
457         struct ctdb_node_map *nodemap, 
458         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int db;
461         uint32_t *nodes;
462
463         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
464
465         /* step through all local databases */
466         for (db=0; db<dbmap->num;db++) {
467                 TDB_DATA data;
468                 struct ctdb_db_priority db_prio;
469                 int ret;
470
471                 db_prio.db_id     = dbmap->dbs[db].dbid;
472                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
473                 if (ret != 0) {
474                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
475                         continue;
476                 }
477
478                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
479
480                 data.dptr  = (uint8_t *)&db_prio;
481                 data.dsize = sizeof(db_prio);
482
483                 if (ctdb_client_async_control(ctdb,
484                                         CTDB_CONTROL_SET_DB_PRIORITY,
485                                         nodes, 0,
486                                         CONTROL_TIMEOUT(), false, data,
487                                         NULL, NULL,
488                                         NULL) != 0) {
489                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
490                 }
491         }
492
493         return 0;
494 }                       
495
496 /*
497   ensure all other nodes have attached to any databases that we have
498  */
499 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
500                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
501 {
502         int i, j, db, ret;
503         struct ctdb_dbid_map *remote_dbmap;
504
505         /* verify that all other nodes have all our databases */
506         for (j=0; j<nodemap->num; j++) {
507                 /* we dont need to ourself ourselves */
508                 if (nodemap->nodes[j].pnn == pnn) {
509                         continue;
510                 }
511                 /* dont check nodes that are unavailable */
512                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
513                         continue;
514                 }
515
516                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
517                                          mem_ctx, &remote_dbmap);
518                 if (ret != 0) {
519                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
520                         return -1;
521                 }
522
523                 /* step through all local databases */
524                 for (db=0; db<dbmap->num;db++) {
525                         const char *name;
526
527
528                         for (i=0;i<remote_dbmap->num;i++) {
529                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
530                                         break;
531                                 }
532                         }
533                         /* the remote node already have this database */
534                         if (i!=remote_dbmap->num) {
535                                 continue;
536                         }
537                         /* ok so we need to create this database */
538                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
539                                             mem_ctx, &name);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
542                                 return -1;
543                         }
544                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
545                                            mem_ctx, name,
546                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
547                         if (ret != 0) {
548                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
549                                 return -1;
550                         }
551                 }
552         }
553
554         return 0;
555 }
556
557
558 /*
559   ensure we are attached to any databases that anyone else is attached to
560  */
561 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
562                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
563 {
564         int i, j, db, ret;
565         struct ctdb_dbid_map *remote_dbmap;
566
567         /* verify that we have all database any other node has */
568         for (j=0; j<nodemap->num; j++) {
569                 /* we dont need to ourself ourselves */
570                 if (nodemap->nodes[j].pnn == pnn) {
571                         continue;
572                 }
573                 /* dont check nodes that are unavailable */
574                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
575                         continue;
576                 }
577
578                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
579                                          mem_ctx, &remote_dbmap);
580                 if (ret != 0) {
581                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
582                         return -1;
583                 }
584
585                 /* step through all databases on the remote node */
586                 for (db=0; db<remote_dbmap->num;db++) {
587                         const char *name;
588
589                         for (i=0;i<(*dbmap)->num;i++) {
590                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
591                                         break;
592                                 }
593                         }
594                         /* we already have this db locally */
595                         if (i!=(*dbmap)->num) {
596                                 continue;
597                         }
598                         /* ok so we need to create this database and
599                            rebuild dbmap
600                          */
601                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
602                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
603                         if (ret != 0) {
604                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
605                                           nodemap->nodes[j].pnn));
606                                 return -1;
607                         }
608                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
609                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
610                         if (ret != 0) {
611                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
612                                 return -1;
613                         }
614                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
615                         if (ret != 0) {
616                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
617                                 return -1;
618                         }
619                 }
620         }
621
622         return 0;
623 }
624
625
626 /*
627   pull the remote database contents from one node into the recdb
628  */
629 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
630                                     struct tdb_wrap *recdb, uint32_t dbid)
631 {
632         int ret;
633         TDB_DATA outdata;
634         struct ctdb_marshall_buffer *reply;
635         struct ctdb_rec_data *rec;
636         int i;
637         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
638
639         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
640                                CONTROL_TIMEOUT(), &outdata);
641         if (ret != 0) {
642                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
643                 talloc_free(tmp_ctx);
644                 return -1;
645         }
646
647         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
648
649         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
650                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
651                 talloc_free(tmp_ctx);
652                 return -1;
653         }
654         
655         rec = (struct ctdb_rec_data *)&reply->data[0];
656         
657         for (i=0;
658              i<reply->count;
659              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
660                 TDB_DATA key, data;
661                 struct ctdb_ltdb_header *hdr;
662                 TDB_DATA existing;
663                 
664                 key.dptr = &rec->data[0];
665                 key.dsize = rec->keylen;
666                 data.dptr = &rec->data[key.dsize];
667                 data.dsize = rec->datalen;
668                 
669                 hdr = (struct ctdb_ltdb_header *)data.dptr;
670
671                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
672                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
673                         talloc_free(tmp_ctx);
674                         return -1;
675                 }
676
677                 /* fetch the existing record, if any */
678                 existing = tdb_fetch(recdb->tdb, key);
679                 
680                 if (existing.dptr != NULL) {
681                         struct ctdb_ltdb_header header;
682                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
683                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
684                                          (unsigned)existing.dsize, srcnode));
685                                 free(existing.dptr);
686                                 talloc_free(tmp_ctx);
687                                 return -1;
688                         }
689                         header = *(struct ctdb_ltdb_header *)existing.dptr;
690                         free(existing.dptr);
691                         if (!(header.rsn < hdr->rsn ||
692                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
693                                 continue;
694                         }
695                 }
696                 
697                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
698                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
699                         talloc_free(tmp_ctx);
700                         return -1;                              
701                 }
702         }
703
704         talloc_free(tmp_ctx);
705
706         return 0;
707 }
708
709
710 struct pull_seqnum_cbdata {
711         int failed;
712         uint32_t pnn;
713         uint64_t seqnum;
714 };
715
716 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
717 {
718         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
719         uint64_t seqnum;
720
721         if (cb_data->failed != 0) {
722                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
723                 return;
724         }
725
726         if (res != 0) {
727                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
728                 cb_data->failed = 1;
729                 return;
730         }
731
732         if (outdata.dsize != sizeof(uint64_t)) {
733                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
734                 cb_data->failed = -1;
735                 return;
736         }
737
738         seqnum = *((uint64_t *)outdata.dptr);
739
740         if (seqnum > cb_data->seqnum) {
741                 cb_data->seqnum = seqnum;
742                 cb_data->pnn = node_pnn;
743         }
744 }
745
746 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
747 {
748         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
749
750         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
751         cb_data->failed = 1;
752 }
753
754 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
755                                 struct ctdb_recoverd *rec, 
756                                 struct ctdb_node_map *nodemap, 
757                                 struct tdb_wrap *recdb, uint32_t dbid)
758 {
759         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
760         uint32_t *nodes;
761         TDB_DATA data;
762         uint32_t outdata[2];
763         struct pull_seqnum_cbdata *cb_data;
764
765         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
766
767         outdata[0] = dbid;
768         outdata[1] = 0;
769
770         data.dsize = sizeof(outdata);
771         data.dptr  = (uint8_t *)&outdata[0];
772
773         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
774         if (cb_data == NULL) {
775                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
776                 talloc_free(tmp_ctx);
777                 return -1;
778         }
779
780         cb_data->failed = 0;
781         cb_data->pnn    = -1;
782         cb_data->seqnum = 0;
783         
784         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
785         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
786                                         nodes, 0,
787                                         CONTROL_TIMEOUT(), false, data,
788                                         pull_seqnum_cb,
789                                         pull_seqnum_fail_cb,
790                                         cb_data) != 0) {
791                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
792
793                 talloc_free(tmp_ctx);
794                 return -1;
795         }
796
797         if (cb_data->failed != 0) {
798                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
799                 talloc_free(tmp_ctx);
800                 return -1;
801         }
802
803         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
804                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
805                 talloc_free(tmp_ctx);
806                 return -1;
807         }
808
809         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
810
811         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
812                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
813                 talloc_free(tmp_ctx);
814                 return -1;
815         }
816
817         talloc_free(tmp_ctx);
818         return 0;
819 }
820
821
822 /*
823   pull all the remote database contents into the recdb
824  */
825 static int pull_remote_database(struct ctdb_context *ctdb,
826                                 struct ctdb_recoverd *rec, 
827                                 struct ctdb_node_map *nodemap, 
828                                 struct tdb_wrap *recdb, uint32_t dbid,
829                                 bool persistent)
830 {
831         int j;
832
833         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
834                 int ret;
835                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
836                 if (ret == 0) {
837                         return 0;
838                 }
839         }
840
841         /* pull all records from all other nodes across onto this node
842            (this merges based on rsn)
843         */
844         for (j=0; j<nodemap->num; j++) {
845                 /* dont merge from nodes that are unavailable */
846                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
847                         continue;
848                 }
849                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
850                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
851                                  nodemap->nodes[j].pnn));
852                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
853                         return -1;
854                 }
855         }
856         
857         return 0;
858 }
859
860
861 /*
862   update flags on all active nodes
863  */
864 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
865 {
866         int ret;
867
868         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
869                 if (ret != 0) {
870                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
871                 return -1;
872         }
873
874         return 0;
875 }
876
877 /*
878   ensure all nodes have the same vnnmap we do
879  */
880 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
881                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
882 {
883         int j, ret;
884
885         /* push the new vnn map out to all the nodes */
886         for (j=0; j<nodemap->num; j++) {
887                 /* dont push to nodes that are unavailable */
888                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
889                         continue;
890                 }
891
892                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
893                 if (ret != 0) {
894                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
895                         return -1;
896                 }
897         }
898
899         return 0;
900 }
901
902
903 struct vacuum_info {
904         struct vacuum_info *next, *prev;
905         struct ctdb_recoverd *rec;
906         uint32_t srcnode;
907         struct ctdb_db_context *ctdb_db;
908         struct ctdb_marshall_buffer *recs;
909         struct ctdb_rec_data *r;
910 };
911
912 static void vacuum_fetch_next(struct vacuum_info *v);
913
914 /*
915   called when a vacuum fetch has completed - just free it and do the next one
916  */
917 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
918 {
919         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
920         talloc_free(state);
921         vacuum_fetch_next(v);
922 }
923
924
925 /*
926   process the next element from the vacuum list
927 */
928 static void vacuum_fetch_next(struct vacuum_info *v)
929 {
930         struct ctdb_call call;
931         struct ctdb_rec_data *r;
932
933         while (v->recs->count) {
934                 struct ctdb_client_call_state *state;
935                 TDB_DATA data;
936                 struct ctdb_ltdb_header *hdr;
937
938                 ZERO_STRUCT(call);
939                 call.call_id = CTDB_NULL_FUNC;
940                 call.flags = CTDB_IMMEDIATE_MIGRATION;
941                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
942
943                 r = v->r;
944                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
945                 v->recs->count--;
946
947                 call.key.dptr = &r->data[0];
948                 call.key.dsize = r->keylen;
949
950                 /* ensure we don't block this daemon - just skip a record if we can't get
951                    the chainlock */
952                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
953                         continue;
954                 }
955
956                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
957                 if (data.dptr == NULL) {
958                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
959                         continue;
960                 }
961
962                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
963                         free(data.dptr);
964                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
965                         continue;
966                 }
967                 
968                 hdr = (struct ctdb_ltdb_header *)data.dptr;
969                 if (hdr->dmaster == v->rec->ctdb->pnn) {
970                         /* its already local */
971                         free(data.dptr);
972                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
973                         continue;
974                 }
975
976                 free(data.dptr);
977
978                 state = ctdb_call_send(v->ctdb_db, &call);
979                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
980                 if (state == NULL) {
981                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
982                         talloc_free(v);
983                         return;
984                 }
985                 state->async.fn = vacuum_fetch_callback;
986                 state->async.private_data = v;
987                 return;
988         }
989
990         talloc_free(v);
991 }
992
993
994 /*
995   destroy a vacuum info structure
996  */
997 static int vacuum_info_destructor(struct vacuum_info *v)
998 {
999         DLIST_REMOVE(v->rec->vacuum_info, v);
1000         return 0;
1001 }
1002
1003
1004 /*
1005   handler for vacuum fetch
1006 */
1007 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1008                                  TDB_DATA data, void *private_data)
1009 {
1010         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1011         struct ctdb_marshall_buffer *recs;
1012         int ret, i;
1013         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1014         const char *name;
1015         struct ctdb_dbid_map *dbmap=NULL;
1016         bool persistent = false;
1017         struct ctdb_db_context *ctdb_db;
1018         struct ctdb_rec_data *r;
1019         uint32_t srcnode;
1020         struct vacuum_info *v;
1021
1022         recs = (struct ctdb_marshall_buffer *)data.dptr;
1023         r = (struct ctdb_rec_data *)&recs->data[0];
1024
1025         if (recs->count == 0) {
1026                 talloc_free(tmp_ctx);
1027                 return;
1028         }
1029
1030         srcnode = r->reqid;
1031
1032         for (v=rec->vacuum_info;v;v=v->next) {
1033                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1034                         /* we're already working on records from this node */
1035                         talloc_free(tmp_ctx);
1036                         return;
1037                 }
1038         }
1039
1040         /* work out if the database is persistent */
1041         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1042         if (ret != 0) {
1043                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1044                 talloc_free(tmp_ctx);
1045                 return;
1046         }
1047
1048         for (i=0;i<dbmap->num;i++) {
1049                 if (dbmap->dbs[i].dbid == recs->db_id) {
1050                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1051                         break;
1052                 }
1053         }
1054         if (i == dbmap->num) {
1055                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1056                 talloc_free(tmp_ctx);
1057                 return;         
1058         }
1059
1060         /* find the name of this database */
1061         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1062                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1063                 talloc_free(tmp_ctx);
1064                 return;
1065         }
1066
1067         /* attach to it */
1068         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1069         if (ctdb_db == NULL) {
1070                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1071                 talloc_free(tmp_ctx);
1072                 return;
1073         }
1074
1075         v = talloc_zero(rec, struct vacuum_info);
1076         if (v == NULL) {
1077                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1078                 talloc_free(tmp_ctx);
1079                 return;
1080         }
1081
1082         v->rec = rec;
1083         v->srcnode = srcnode;
1084         v->ctdb_db = ctdb_db;
1085         v->recs = talloc_memdup(v, recs, data.dsize);
1086         if (v->recs == NULL) {
1087                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1088                 talloc_free(v);
1089                 talloc_free(tmp_ctx);
1090                 return;         
1091         }
1092         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1093
1094         DLIST_ADD(rec->vacuum_info, v);
1095
1096         talloc_set_destructor(v, vacuum_info_destructor);
1097
1098         vacuum_fetch_next(v);
1099         talloc_free(tmp_ctx);
1100 }
1101
1102
1103 /*
1104   called when ctdb_wait_timeout should finish
1105  */
1106 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1107                               struct timeval yt, void *p)
1108 {
1109         uint32_t *timed_out = (uint32_t *)p;
1110         (*timed_out) = 1;
1111 }
1112
1113 /*
1114   wait for a given number of seconds
1115  */
1116 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1117 {
1118         uint32_t timed_out = 0;
1119         time_t usecs = (secs - (time_t)secs) * 1000000;
1120         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1121         while (!timed_out) {
1122                 event_loop_once(ctdb->ev);
1123         }
1124 }
1125
1126 /*
1127   called when an election times out (ends)
1128  */
1129 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1130                                   struct timeval t, void *p)
1131 {
1132         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1133         rec->election_timeout = NULL;
1134         fast_start = false;
1135
1136         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1137 }
1138
1139
1140 /*
1141   wait for an election to finish. It finished election_timeout seconds after
1142   the last election packet is received
1143  */
1144 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1145 {
1146         struct ctdb_context *ctdb = rec->ctdb;
1147         while (rec->election_timeout) {
1148                 event_loop_once(ctdb->ev);
1149         }
1150 }
1151
1152 /*
1153   Update our local flags from all remote connected nodes. 
1154   This is only run when we are or we belive we are the recovery master
1155  */
1156 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1157 {
1158         int j;
1159         struct ctdb_context *ctdb = rec->ctdb;
1160         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1161
1162         /* get the nodemap for all active remote nodes and verify
1163            they are the same as for this node
1164          */
1165         for (j=0; j<nodemap->num; j++) {
1166                 struct ctdb_node_map *remote_nodemap=NULL;
1167                 int ret;
1168
1169                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1170                         continue;
1171                 }
1172                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1173                         continue;
1174                 }
1175
1176                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1177                                            mem_ctx, &remote_nodemap);
1178                 if (ret != 0) {
1179                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1180                                   nodemap->nodes[j].pnn));
1181                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1182                         talloc_free(mem_ctx);
1183                         return MONITOR_FAILED;
1184                 }
1185                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1186                         /* We should tell our daemon about this so it
1187                            updates its flags or else we will log the same 
1188                            message again in the next iteration of recovery.
1189                            Since we are the recovery master we can just as
1190                            well update the flags on all nodes.
1191                         */
1192                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1193                         if (ret != 0) {
1194                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1195                                 return -1;
1196                         }
1197
1198                         /* Update our local copy of the flags in the recovery
1199                            daemon.
1200                         */
1201                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1202                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1203                                  nodemap->nodes[j].flags));
1204                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1205                 }
1206                 talloc_free(remote_nodemap);
1207         }
1208         talloc_free(mem_ctx);
1209         return MONITOR_OK;
1210 }
1211
1212
1213 /* Create a new random generation ip. 
1214    The generation id can not be the INVALID_GENERATION id
1215 */
1216 static uint32_t new_generation(void)
1217 {
1218         uint32_t generation;
1219
1220         while (1) {
1221                 generation = random();
1222
1223                 if (generation != INVALID_GENERATION) {
1224                         break;
1225                 }
1226         }
1227
1228         return generation;
1229 }
1230
1231
1232 /*
1233   create a temporary working database
1234  */
1235 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1236 {
1237         char *name;
1238         struct tdb_wrap *recdb;
1239         unsigned tdb_flags;
1240
1241         /* open up the temporary recovery database */
1242         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1243                                ctdb->db_directory_state,
1244                                ctdb->pnn);
1245         if (name == NULL) {
1246                 return NULL;
1247         }
1248         unlink(name);
1249
1250         tdb_flags = TDB_NOLOCK;
1251         if (ctdb->valgrinding) {
1252                 tdb_flags |= TDB_NOMMAP;
1253         }
1254         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1255
1256         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1257                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1258         if (recdb == NULL) {
1259                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1260         }
1261
1262         talloc_free(name);
1263
1264         return recdb;
1265 }
1266
1267
1268 /* 
1269    a traverse function for pulling all relevant records from recdb
1270  */
1271 struct recdb_data {
1272         struct ctdb_context *ctdb;
1273         struct ctdb_marshall_buffer *recdata;
1274         uint32_t len;
1275         uint32_t allocated_len;
1276         bool failed;
1277         bool persistent;
1278 };
1279
1280 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1281 {
1282         struct recdb_data *params = (struct recdb_data *)p;
1283         struct ctdb_rec_data *rec;
1284         struct ctdb_ltdb_header *hdr;
1285
1286         /*
1287          * skip empty records - but NOT for persistent databases:
1288          *
1289          * The record-by-record mode of recovery deletes empty records.
1290          * For persistent databases, this can lead to data corruption
1291          * by deleting records that should be there:
1292          *
1293          * - Assume the cluster has been running for a while.
1294          *
1295          * - A record R in a persistent database has been created and
1296          *   deleted a couple of times, the last operation being deletion,
1297          *   leaving an empty record with a high RSN, say 10.
1298          *
1299          * - Now a node N is turned off.
1300          *
1301          * - This leaves the local database copy of D on N with the empty
1302          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1303          *   the copy of record R.
1304          *
1305          * - Now the record is created again while node N is turned off.
1306          *   This creates R with RSN = 1 on all nodes except for N.
1307          *
1308          * - Now node N is turned on again. The following recovery will chose
1309          *   the older empty copy of R due to RSN 10 > RSN 1.
1310          *
1311          * ==> Hence the record is gone after the recovery.
1312          *
1313          * On databases like Samba's registry, this can damage the higher-level
1314          * data structures built from the various tdb-level records.
1315          */
1316         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1317                 return 0;
1318         }
1319
1320         /* update the dmaster field to point to us */
1321         hdr = (struct ctdb_ltdb_header *)data.dptr;
1322         if (!params->persistent) {
1323                 hdr->dmaster = params->ctdb->pnn;
1324                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1325         }
1326
1327         /* add the record to the blob ready to send to the nodes */
1328         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1329         if (rec == NULL) {
1330                 params->failed = true;
1331                 return -1;
1332         }
1333         if (params->len + rec->length >= params->allocated_len) {
1334                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1335                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1336         }
1337         if (params->recdata == NULL) {
1338                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1339                          rec->length + params->len));
1340                 params->failed = true;
1341                 return -1;
1342         }
1343         params->recdata->count++;
1344         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1345         params->len += rec->length;
1346         talloc_free(rec);
1347
1348         return 0;
1349 }
1350
1351 /*
1352   push the recdb database out to all nodes
1353  */
1354 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1355                                bool persistent,
1356                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1357 {
1358         struct recdb_data params;
1359         struct ctdb_marshall_buffer *recdata;
1360         TDB_DATA outdata;
1361         TALLOC_CTX *tmp_ctx;
1362         uint32_t *nodes;
1363
1364         tmp_ctx = talloc_new(ctdb);
1365         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1366
1367         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1368         CTDB_NO_MEMORY(ctdb, recdata);
1369
1370         recdata->db_id = dbid;
1371
1372         params.ctdb = ctdb;
1373         params.recdata = recdata;
1374         params.len = offsetof(struct ctdb_marshall_buffer, data);
1375         params.allocated_len = params.len;
1376         params.failed = false;
1377         params.persistent = persistent;
1378
1379         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1380                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1381                 talloc_free(params.recdata);
1382                 talloc_free(tmp_ctx);
1383                 return -1;
1384         }
1385
1386         if (params.failed) {
1387                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1388                 talloc_free(params.recdata);
1389                 talloc_free(tmp_ctx);
1390                 return -1;              
1391         }
1392
1393         recdata = params.recdata;
1394
1395         outdata.dptr = (void *)recdata;
1396         outdata.dsize = params.len;
1397
1398         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1399         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1400                                         nodes, 0,
1401                                         CONTROL_TIMEOUT(), false, outdata,
1402                                         NULL, NULL,
1403                                         NULL) != 0) {
1404                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1405                 talloc_free(recdata);
1406                 talloc_free(tmp_ctx);
1407                 return -1;
1408         }
1409
1410         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1411                   dbid, recdata->count));
1412
1413         talloc_free(recdata);
1414         talloc_free(tmp_ctx);
1415
1416         return 0;
1417 }
1418
1419
1420 /*
1421   go through a full recovery on one database 
1422  */
1423 static int recover_database(struct ctdb_recoverd *rec, 
1424                             TALLOC_CTX *mem_ctx,
1425                             uint32_t dbid,
1426                             bool persistent,
1427                             uint32_t pnn, 
1428                             struct ctdb_node_map *nodemap,
1429                             uint32_t transaction_id)
1430 {
1431         struct tdb_wrap *recdb;
1432         int ret;
1433         struct ctdb_context *ctdb = rec->ctdb;
1434         TDB_DATA data;
1435         struct ctdb_control_wipe_database w;
1436         uint32_t *nodes;
1437
1438         recdb = create_recdb(ctdb, mem_ctx);
1439         if (recdb == NULL) {
1440                 return -1;
1441         }
1442
1443         /* pull all remote databases onto the recdb */
1444         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1445         if (ret != 0) {
1446                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1447                 return -1;
1448         }
1449
1450         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1451
1452         /* wipe all the remote databases. This is safe as we are in a transaction */
1453         w.db_id = dbid;
1454         w.transaction_id = transaction_id;
1455
1456         data.dptr = (void *)&w;
1457         data.dsize = sizeof(w);
1458
1459         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1460         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1461                                         nodes, 0,
1462                                         CONTROL_TIMEOUT(), false, data,
1463                                         NULL, NULL,
1464                                         NULL) != 0) {
1465                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1466                 talloc_free(recdb);
1467                 return -1;
1468         }
1469         
1470         /* push out the correct database. This sets the dmaster and skips 
1471            the empty records */
1472         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1473         if (ret != 0) {
1474                 talloc_free(recdb);
1475                 return -1;
1476         }
1477
1478         /* all done with this database */
1479         talloc_free(recdb);
1480
1481         return 0;
1482 }
1483
1484 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1485                                          struct ctdb_recoverd *rec,
1486                                          struct ctdb_node_map *nodemap,
1487                                          uint32_t *culprit)
1488 {
1489         int j;
1490         int ret;
1491
1492         if (ctdb->num_nodes != nodemap->num) {
1493                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1494                                   ctdb->num_nodes, nodemap->num));
1495                 if (culprit) {
1496                         *culprit = ctdb->pnn;
1497                 }
1498                 return -1;
1499         }
1500
1501         for (j=0; j<nodemap->num; j++) {
1502                 /* For readability */
1503                 struct ctdb_node *node = ctdb->nodes[j];
1504
1505                 /* release any existing data */
1506                 if (node->known_public_ips) {
1507                         talloc_free(node->known_public_ips);
1508                         node->known_public_ips = NULL;
1509                 }
1510                 if (node->available_public_ips) {
1511                         talloc_free(node->available_public_ips);
1512                         node->available_public_ips = NULL;
1513                 }
1514
1515                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1516                         continue;
1517                 }
1518
1519                 /* Retrieve the list of known public IPs from the node */
1520                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1521                                         CONTROL_TIMEOUT(),
1522                                         node->pnn,
1523                                         ctdb->nodes,
1524                                         0,
1525                                         &node->known_public_ips);
1526                 if (ret != 0) {
1527                         DEBUG(DEBUG_ERR,
1528                               ("Failed to read known public IPs from node: %u\n",
1529                                node->pnn));
1530                         if (culprit) {
1531                                 *culprit = node->pnn;
1532                         }
1533                         return -1;
1534                 }
1535
1536                 if (ctdb->do_checkpublicip &&
1537                     rec->takeover_runs_disable_ctx == NULL &&
1538                     verify_remote_ip_allocation(ctdb,
1539                                                  node->known_public_ips,
1540                                                  node->pnn)) {
1541                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1542                         rec->need_takeover_run = true;
1543                 }
1544
1545                 /* Retrieve the list of available public IPs from the node */
1546                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1547                                         CONTROL_TIMEOUT(),
1548                                         node->pnn,
1549                                         ctdb->nodes,
1550                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1551                                         &node->available_public_ips);
1552                 if (ret != 0) {
1553                         DEBUG(DEBUG_ERR,
1554                               ("Failed to read available public IPs from node: %u\n",
1555                                node->pnn));
1556                         if (culprit) {
1557                                 *culprit = node->pnn;
1558                         }
1559                         return -1;
1560                 }
1561         }
1562
1563         return 0;
1564 }
1565
1566 /* when we start a recovery, make sure all nodes use the same reclock file
1567    setting
1568 */
1569 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1570 {
1571         struct ctdb_context *ctdb = rec->ctdb;
1572         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1573         TDB_DATA data;
1574         uint32_t *nodes;
1575
1576         if (ctdb->recovery_lock_file == NULL) {
1577                 data.dptr  = NULL;
1578                 data.dsize = 0;
1579         } else {
1580                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1581                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1582         }
1583
1584         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1585         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1586                                         nodes, 0,
1587                                         CONTROL_TIMEOUT(),
1588                                         false, data,
1589                                         NULL, NULL,
1590                                         rec) != 0) {
1591                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1592                 talloc_free(tmp_ctx);
1593                 return -1;
1594         }
1595
1596         talloc_free(tmp_ctx);
1597         return 0;
1598 }
1599
1600
1601 /*
1602  * this callback is called for every node that failed to execute ctdb_takeover_run()
1603  * and set flag to re-run takeover run.
1604  */
1605 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1606 {
1607         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1608
1609         if (callback_data != NULL) {
1610                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1611
1612                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1613
1614                 ctdb_set_culprit(rec, node_pnn);
1615         }
1616 }
1617
1618
1619 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1620 {
1621         struct ctdb_context *ctdb = rec->ctdb;
1622         int i;
1623         struct ctdb_banning_state *ban_state;
1624
1625         *self_ban = false;
1626         for (i=0; i<ctdb->num_nodes; i++) {
1627                 if (ctdb->nodes[i]->ban_state == NULL) {
1628                         continue;
1629                 }
1630                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1631                 if (ban_state->count < 2*ctdb->num_nodes) {
1632                         continue;
1633                 }
1634
1635                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1636                         ctdb->nodes[i]->pnn, ban_state->count,
1637                         ctdb->tunable.recovery_ban_period));
1638                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1639                 ban_state->count = 0;
1640
1641                 /* Banning ourself? */
1642                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1643                         *self_ban = true;
1644                 }
1645         }
1646 }
1647
1648 static bool do_takeover_run(struct ctdb_recoverd *rec,
1649                             struct ctdb_node_map *nodemap,
1650                             bool banning_credits_on_fail)
1651 {
1652         uint32_t *nodes = NULL;
1653         struct srvid_request dtr;
1654         TDB_DATA data;
1655         int i;
1656         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1657         int ret;
1658         bool ok;
1659
1660         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1661
1662         if (rec->takeover_run_in_progress) {
1663                 DEBUG(DEBUG_ERR, (__location__
1664                                   " takeover run already in progress \n"));
1665                 ok = false;
1666                 goto done;
1667         }
1668
1669         rec->takeover_run_in_progress = true;
1670
1671         /* If takeover runs are in disabled then fail... */
1672         if (rec->takeover_runs_disable_ctx != NULL) {
1673                 DEBUG(DEBUG_ERR,
1674                       ("Takeover runs are disabled so refusing to run one\n"));
1675                 ok = false;
1676                 goto done;
1677         }
1678
1679         /* Disable IP checks (takeover runs, really) on other nodes
1680          * while doing this takeover run.  This will stop those other
1681          * nodes from triggering takeover runs when think they should
1682          * be hosting an IP but it isn't yet on an interface.  Don't
1683          * wait for replies since a failure here might cause some
1684          * noise in the logs but will not actually cause a problem.
1685          */
1686         dtr.srvid = 0; /* No reply */
1687         dtr.pnn = -1;
1688
1689         data.dptr  = (uint8_t*)&dtr;
1690         data.dsize = sizeof(dtr);
1691
1692         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1693
1694         /* Disable for 60 seconds.  This can be a tunable later if
1695          * necessary.
1696          */
1697         dtr.data = 60;
1698         for (i = 0; i < talloc_array_length(nodes); i++) {
1699                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1700                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1701                                              data) != 0) {
1702                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1703                 }
1704         }
1705
1706         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1707                                 rec->force_rebalance_nodes,
1708                                 takeover_fail_callback,
1709                                 banning_credits_on_fail ? rec : NULL);
1710
1711         /* Reenable takeover runs and IP checks on other nodes */
1712         dtr.data = 0;
1713         for (i = 0; i < talloc_array_length(nodes); i++) {
1714                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1715                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1716                                              data) != 0) {
1717                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1718                 }
1719         }
1720
1721         if (ret != 0) {
1722                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1723                 ok = false;
1724                 goto done;
1725         }
1726
1727         ok = true;
1728         /* Takeover run was successful so clear force rebalance targets */
1729         if (rebalance_nodes == rec->force_rebalance_nodes) {
1730                 TALLOC_FREE(rec->force_rebalance_nodes);
1731         } else {
1732                 DEBUG(DEBUG_WARNING,
1733                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1734         }
1735 done:
1736         rec->need_takeover_run = !ok;
1737         talloc_free(nodes);
1738         rec->takeover_run_in_progress = false;
1739
1740         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1741         return ok;
1742 }
1743
1744
1745 /*
1746   we are the recmaster, and recovery is needed - start a recovery run
1747  */
1748 static int do_recovery(struct ctdb_recoverd *rec, 
1749                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1750                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1751 {
1752         struct ctdb_context *ctdb = rec->ctdb;
1753         int i, j, ret;
1754         uint32_t generation;
1755         struct ctdb_dbid_map *dbmap;
1756         TDB_DATA data;
1757         uint32_t *nodes;
1758         struct timeval start_time;
1759         uint32_t culprit = (uint32_t)-1;
1760         bool self_ban;
1761
1762         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1763
1764         /* if recovery fails, force it again */
1765         rec->need_recovery = true;
1766
1767         ban_misbehaving_nodes(rec, &self_ban);
1768         if (self_ban) {
1769                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1770                 return -1;
1771         }
1772
1773         if (ctdb->tunable.verify_recovery_lock != 0) {
1774                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1775                 start_time = timeval_current();
1776                 if (!ctdb_recovery_lock(ctdb, true)) {
1777                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1778                                          "and ban ourself for %u seconds\n",
1779                                          ctdb->tunable.recovery_ban_period));
1780                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1781                         return -1;
1782                 }
1783                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1784                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1785         }
1786
1787         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1788
1789         /* get a list of all databases */
1790         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1791         if (ret != 0) {
1792                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1793                 return -1;
1794         }
1795
1796         /* we do the db creation before we set the recovery mode, so the freeze happens
1797            on all databases we will be dealing with. */
1798
1799         /* verify that we have all the databases any other node has */
1800         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1801         if (ret != 0) {
1802                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1803                 return -1;
1804         }
1805
1806         /* verify that all other nodes have all our databases */
1807         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1808         if (ret != 0) {
1809                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1810                 return -1;
1811         }
1812         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1813
1814         /* update the database priority for all remote databases */
1815         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1816         if (ret != 0) {
1817                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1818         }
1819         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1820
1821
1822         /* update all other nodes to use the same setting for reclock files
1823            as the local recovery master.
1824         */
1825         sync_recovery_lock_file_across_cluster(rec);
1826
1827         /* set recovery mode to active on all nodes */
1828         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1829         if (ret != 0) {
1830                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1831                 return -1;
1832         }
1833
1834         /* execute the "startrecovery" event script on all nodes */
1835         ret = run_startrecovery_eventscript(rec, nodemap);
1836         if (ret!=0) {
1837                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1838                 return -1;
1839         }
1840
1841         /*
1842           update all nodes to have the same flags that we have
1843          */
1844         for (i=0;i<nodemap->num;i++) {
1845                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1846                         continue;
1847                 }
1848
1849                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1850                 if (ret != 0) {
1851                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1852                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1853                         } else {
1854                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1855                                 return -1;
1856                         }
1857                 }
1858         }
1859
1860         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1861
1862         /* pick a new generation number */
1863         generation = new_generation();
1864
1865         /* change the vnnmap on this node to use the new generation 
1866            number but not on any other nodes.
1867            this guarantees that if we abort the recovery prematurely
1868            for some reason (a node stops responding?)
1869            that we can just return immediately and we will reenter
1870            recovery shortly again.
1871            I.e. we deliberately leave the cluster with an inconsistent
1872            generation id to allow us to abort recovery at any stage and
1873            just restart it from scratch.
1874          */
1875         vnnmap->generation = generation;
1876         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1877         if (ret != 0) {
1878                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1879                 return -1;
1880         }
1881
1882         data.dptr = (void *)&generation;
1883         data.dsize = sizeof(uint32_t);
1884
1885         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1886         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1887                                         nodes, 0,
1888                                         CONTROL_TIMEOUT(), false, data,
1889                                         NULL,
1890                                         transaction_start_fail_callback,
1891                                         rec) != 0) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1893                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1894                                         nodes, 0,
1895                                         CONTROL_TIMEOUT(), false, tdb_null,
1896                                         NULL,
1897                                         NULL,
1898                                         NULL) != 0) {
1899                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1900                 }
1901                 return -1;
1902         }
1903
1904         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1905
1906         for (i=0;i<dbmap->num;i++) {
1907                 ret = recover_database(rec, mem_ctx,
1908                                        dbmap->dbs[i].dbid,
1909                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1910                                        pnn, nodemap, generation);
1911                 if (ret != 0) {
1912                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1913                         return -1;
1914                 }
1915         }
1916
1917         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1918
1919         /* commit all the changes */
1920         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1921                                         nodes, 0,
1922                                         CONTROL_TIMEOUT(), false, data,
1923                                         NULL, NULL,
1924                                         NULL) != 0) {
1925                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1926                 return -1;
1927         }
1928
1929         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1930         
1931
1932         /* update the capabilities for all nodes */
1933         ret = update_capabilities(ctdb, nodemap);
1934         if (ret!=0) {
1935                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1936                 return -1;
1937         }
1938
1939         /* build a new vnn map with all the currently active and
1940            unbanned nodes */
1941         generation = new_generation();
1942         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1943         CTDB_NO_MEMORY(ctdb, vnnmap);
1944         vnnmap->generation = generation;
1945         vnnmap->size = 0;
1946         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1947         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1948         for (i=j=0;i<nodemap->num;i++) {
1949                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1950                         continue;
1951                 }
1952                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1953                         /* this node can not be an lmaster */
1954                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1955                         continue;
1956                 }
1957
1958                 vnnmap->size++;
1959                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1960                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1961                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1962
1963         }
1964         if (vnnmap->size == 0) {
1965                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1966                 vnnmap->size++;
1967                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1968                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1969                 vnnmap->map[0] = pnn;
1970         }       
1971
1972         /* update to the new vnnmap on all nodes */
1973         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1974         if (ret != 0) {
1975                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1976                 return -1;
1977         }
1978
1979         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1980
1981         /* update recmaster to point to us for all nodes */
1982         ret = set_recovery_master(ctdb, nodemap, pnn);
1983         if (ret!=0) {
1984                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1985                 return -1;
1986         }
1987
1988         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1989
1990         /*
1991           update all nodes to have the same flags that we have
1992          */
1993         for (i=0;i<nodemap->num;i++) {
1994                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1995                         continue;
1996                 }
1997
1998                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1999                 if (ret != 0) {
2000                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2001                         return -1;
2002                 }
2003         }
2004
2005         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2006
2007         /* disable recovery mode */
2008         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
2009         if (ret != 0) {
2010                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2011                 return -1;
2012         }
2013
2014         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2015
2016         /* Fetch known/available public IPs from each active node */
2017         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2018         if (ret != 0) {
2019                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2020                                  culprit));
2021                 rec->need_takeover_run = true;
2022                 return -1;
2023         }
2024
2025         do_takeover_run(rec, nodemap, false);
2026
2027         /* execute the "recovered" event script on all nodes */
2028         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2029         if (ret!=0) {
2030                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2031                 return -1;
2032         }
2033
2034         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2035
2036         /* send a message to all clients telling them that the cluster 
2037            has been reconfigured */
2038         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
2039
2040         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2041
2042         rec->need_recovery = false;
2043
2044         /* we managed to complete a full recovery, make sure to forgive
2045            any past sins by the nodes that could now participate in the
2046            recovery.
2047         */
2048         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2049         for (i=0;i<nodemap->num;i++) {
2050                 struct ctdb_banning_state *ban_state;
2051
2052                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2053                         continue;
2054                 }
2055
2056                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2057                 if (ban_state == NULL) {
2058                         continue;
2059                 }
2060
2061                 ban_state->count = 0;
2062         }
2063
2064
2065         /* We just finished a recovery successfully. 
2066            We now wait for rerecovery_timeout before we allow 
2067            another recovery to take place.
2068         */
2069         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2070         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2071         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2072
2073         return 0;
2074 }
2075
2076
2077 /*
2078   elections are won by first checking the number of connected nodes, then
2079   the priority time, then the pnn
2080  */
2081 struct election_message {
2082         uint32_t num_connected;
2083         struct timeval priority_time;
2084         uint32_t pnn;
2085         uint32_t node_flags;
2086 };
2087
2088 /*
2089   form this nodes election data
2090  */
2091 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2092 {
2093         int ret, i;
2094         struct ctdb_node_map *nodemap;
2095         struct ctdb_context *ctdb = rec->ctdb;
2096
2097         ZERO_STRUCTP(em);
2098
2099         em->pnn = rec->ctdb->pnn;
2100         em->priority_time = rec->priority_time;
2101
2102         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2103         if (ret != 0) {
2104                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
2105                 return;
2106         }
2107
2108         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2109         em->node_flags = rec->node_flags;
2110
2111         for (i=0;i<nodemap->num;i++) {
2112                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2113                         em->num_connected++;
2114                 }
2115         }
2116
2117         /* we shouldnt try to win this election if we cant be a recmaster */
2118         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2119                 em->num_connected = 0;
2120                 em->priority_time = timeval_current();
2121         }
2122
2123         talloc_free(nodemap);
2124 }
2125
2126 /*
2127   see if the given election data wins
2128  */
2129 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2130 {
2131         struct election_message myem;
2132         int cmp = 0;
2133
2134         ctdb_election_data(rec, &myem);
2135
2136         /* we cant win if we dont have the recmaster capability */
2137         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2138                 return false;
2139         }
2140
2141         /* we cant win if we are banned */
2142         if (rec->node_flags & NODE_FLAGS_BANNED) {
2143                 return false;
2144         }
2145
2146         /* we cant win if we are stopped */
2147         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2148                 return false;
2149         }
2150
2151         /* we will automatically win if the other node is banned */
2152         if (em->node_flags & NODE_FLAGS_BANNED) {
2153                 return true;
2154         }
2155
2156         /* we will automatically win if the other node is banned */
2157         if (em->node_flags & NODE_FLAGS_STOPPED) {
2158                 return true;
2159         }
2160
2161         /* try to use the most connected node */
2162         if (cmp == 0) {
2163                 cmp = (int)myem.num_connected - (int)em->num_connected;
2164         }
2165
2166         /* then the longest running node */
2167         if (cmp == 0) {
2168                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2169         }
2170
2171         if (cmp == 0) {
2172                 cmp = (int)myem.pnn - (int)em->pnn;
2173         }
2174
2175         return cmp > 0;
2176 }
2177
2178 /*
2179   send out an election request
2180  */
2181 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2182 {
2183         int ret;
2184         TDB_DATA election_data;
2185         struct election_message emsg;
2186         uint64_t srvid;
2187         struct ctdb_context *ctdb = rec->ctdb;
2188
2189         srvid = CTDB_SRVID_RECOVERY;
2190
2191         ctdb_election_data(rec, &emsg);
2192
2193         election_data.dsize = sizeof(struct election_message);
2194         election_data.dptr  = (unsigned char *)&emsg;
2195
2196
2197         /* first we assume we will win the election and set 
2198            recoverymaster to be ourself on the current node
2199          */
2200         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2201         if (ret != 0) {
2202                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2203                 return -1;
2204         }
2205
2206
2207         /* send an election message to all active nodes */
2208         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2209         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2210
2211         return 0;
2212 }
2213
2214 /*
2215   this function will unban all nodes in the cluster
2216 */
2217 static void unban_all_nodes(struct ctdb_context *ctdb)
2218 {
2219         int ret, i;
2220         struct ctdb_node_map *nodemap;
2221         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2222         
2223         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2224         if (ret != 0) {
2225                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2226                 return;
2227         }
2228
2229         for (i=0;i<nodemap->num;i++) {
2230                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2231                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2232                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2233                 }
2234         }
2235
2236         talloc_free(tmp_ctx);
2237 }
2238
2239
2240 /*
2241   we think we are winning the election - send a broadcast election request
2242  */
2243 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2244 {
2245         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2246         int ret;
2247
2248         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2249         if (ret != 0) {
2250                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2251         }
2252
2253         talloc_free(rec->send_election_te);
2254         rec->send_election_te = NULL;
2255 }
2256
2257 /*
2258   handler for memory dumps
2259 */
2260 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2261                              TDB_DATA data, void *private_data)
2262 {
2263         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2264         TDB_DATA *dump;
2265         int ret;
2266         struct srvid_request *rd;
2267
2268         if (data.dsize != sizeof(struct srvid_request)) {
2269                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2270                 talloc_free(tmp_ctx);
2271                 return;
2272         }
2273         rd = (struct srvid_request *)data.dptr;
2274
2275         dump = talloc_zero(tmp_ctx, TDB_DATA);
2276         if (dump == NULL) {
2277                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2278                 talloc_free(tmp_ctx);
2279                 return;
2280         }
2281         ret = ctdb_dump_memory(ctdb, dump);
2282         if (ret != 0) {
2283                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2284                 talloc_free(tmp_ctx);
2285                 return;
2286         }
2287
2288 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2289
2290         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2291         if (ret != 0) {
2292                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2293                 talloc_free(tmp_ctx);
2294                 return;
2295         }
2296
2297         talloc_free(tmp_ctx);
2298 }
2299
2300 /*
2301   handler for getlog
2302 */
2303 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2304                            TDB_DATA data, void *private_data)
2305 {
2306         struct ctdb_get_log_addr *log_addr;
2307         pid_t child;
2308
2309         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2310                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2311                 return;
2312         }
2313         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2314
2315         child = ctdb_fork_no_free_ringbuffer(ctdb);
2316         if (child == (pid_t)-1) {
2317                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2318                 return;
2319         }
2320
2321         if (child == 0) {
2322                 ctdb_set_process_name("ctdb_rec_log_collector");
2323                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2324                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2325                         _exit(1);
2326                 }
2327                 ctdb_collect_log(ctdb, log_addr);
2328                 _exit(0);
2329         }
2330 }
2331
2332 /*
2333   handler for clearlog
2334 */
2335 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2336                              TDB_DATA data, void *private_data)
2337 {
2338         ctdb_clear_log(ctdb);
2339 }
2340
2341 /*
2342   handler for reload_nodes
2343 */
2344 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2345                              TDB_DATA data, void *private_data)
2346 {
2347         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2348
2349         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2350
2351         ctdb_load_nodes_file(rec->ctdb);
2352 }
2353
2354
2355 static void ctdb_rebalance_timeout(struct event_context *ev,
2356                                    struct timed_event *te,
2357                                    struct timeval t, void *p)
2358 {
2359         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2360
2361         if (rec->force_rebalance_nodes == NULL) {
2362                 DEBUG(DEBUG_ERR,
2363                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2364                 return;
2365         }
2366
2367         DEBUG(DEBUG_NOTICE,
2368               ("Rebalance timeout occurred - do takeover run\n"));
2369         do_takeover_run(rec, rec->nodemap, false);
2370 }
2371
2372         
2373 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2374                                         uint64_t srvid,
2375                                         TDB_DATA data, void *private_data)
2376 {
2377         uint32_t pnn;
2378         uint32_t *t;
2379         int len;
2380         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2381
2382         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2383                 return;
2384         }
2385
2386         if (data.dsize != sizeof(uint32_t)) {
2387                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2388                 return;
2389         }
2390
2391         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2392                 return;
2393         }
2394
2395         pnn = *(uint32_t *)&data.dptr[0];
2396
2397         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2398
2399         /* Copy any existing list of nodes.  There's probably some
2400          * sort of realloc variant that will do this but we need to
2401          * make sure that freeing the old array also cancels the timer
2402          * event for the timeout... not sure if realloc will do that.
2403          */
2404         len = (rec->force_rebalance_nodes != NULL) ?
2405                 talloc_array_length(rec->force_rebalance_nodes) :
2406                 0;
2407
2408         /* This allows duplicates to be added but they don't cause
2409          * harm.  A call to add a duplicate PNN arguably means that
2410          * the timeout should be reset, so this is the simplest
2411          * solution.
2412          */
2413         t = talloc_zero_array(rec, uint32_t, len+1);
2414         CTDB_NO_MEMORY_VOID(ctdb, t);
2415         if (len > 0) {
2416                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2417         }
2418         t[len] = pnn;
2419
2420         talloc_free(rec->force_rebalance_nodes);
2421
2422         rec->force_rebalance_nodes = t;
2423         event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2424                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2425                         ctdb_rebalance_timeout, rec);
2426 }
2427
2428
2429
2430 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2431                              TDB_DATA data, void *private_data)
2432 {
2433         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2434         struct ctdb_public_ip *ip;
2435
2436         if (rec->recmaster != rec->ctdb->pnn) {
2437                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2438                 return;
2439         }
2440
2441         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2442                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2443                 return;
2444         }
2445
2446         ip = (struct ctdb_public_ip *)data.dptr;
2447
2448         update_ip_assignment_tree(rec->ctdb, ip);
2449 }
2450
2451
2452 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2453 {
2454         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2455 }
2456
2457 static void reenable_takeover_runs(struct event_context *ev,
2458                                    struct timed_event *te,
2459                                    struct timeval yt, void *p)
2460 {
2461         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2462
2463         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2464         clear_takeover_runs_disable(rec);
2465 }
2466
2467 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2468                                           uint64_t srvid, TDB_DATA data,
2469                                           void *private_data)
2470 {
2471         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2472                                                     struct ctdb_recoverd);
2473         struct srvid_request *r;
2474         uint32_t timeout;
2475         TDB_DATA result;
2476         int32_t ret = 0;
2477
2478         /* Validate input data */
2479         if (data.dsize != sizeof(struct srvid_request)) {
2480                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2481                                  "expecting %lu\n", (long unsigned)data.dsize,
2482                                  (long unsigned)sizeof(struct srvid_request)));
2483                 ret = -EINVAL;
2484                 goto done;
2485         }
2486         if (data.dptr == NULL) {
2487                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2488                 ret = -EINVAL;
2489                 goto done;
2490         }
2491
2492         r = (struct srvid_request *)data.dptr;
2493         timeout = r->data;
2494
2495         if (timeout == 0) {
2496                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2497                 clear_takeover_runs_disable(rec);
2498                 ret = ctdb_get_pnn(ctdb);
2499                 goto done;
2500         }
2501
2502         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
2503                 DEBUG(DEBUG_ERR,
2504                       ("Refusing to disable takeover runs on inactive node\n"));
2505                 ret = -EHOSTDOWN;
2506                 goto done;
2507         }
2508
2509         if (rec->takeover_run_in_progress) {
2510                 DEBUG(DEBUG_ERR,
2511                       ("Unable to disable takeover runs - in progress\n"));
2512                 ret = -EAGAIN;
2513                 goto done;
2514         }
2515
2516         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2517
2518         /* Clear any old timers */
2519         clear_takeover_runs_disable(rec);
2520
2521         /* When this is non-NULL it indicates that takeover runs are
2522          * disabled.  This context also holds the timeout timer.
2523          */
2524         rec->takeover_runs_disable_ctx = talloc_new(rec);
2525         if (rec->takeover_runs_disable_ctx == NULL) {
2526                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2527                 ret = -ENOMEM;
2528                 goto done;
2529         }
2530
2531         /* Arrange for the timeout to occur */
2532         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2533                         timeval_current_ofs(timeout, 0),
2534                         reenable_takeover_runs,
2535                         rec);
2536
2537         /* Returning our PNN tells the caller that we succeeded */
2538         ret = ctdb_get_pnn(ctdb);
2539 done:
2540         result.dsize = sizeof(int32_t);
2541         result.dptr  = (uint8_t *)&ret;
2542         srvid_request_reply(ctdb, r, result);
2543 }
2544
2545 /* Backward compatibility for this SRVID - call
2546  * disable_takeover_runs_handler() instead
2547  */
2548 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2549                                      TDB_DATA data, void *private_data)
2550 {
2551         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2552                                                     struct ctdb_recoverd);
2553         TDB_DATA data2;
2554         struct srvid_request *req;
2555
2556         if (data.dsize != sizeof(uint32_t)) {
2557                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2558                                  "expecting %lu\n", (long unsigned)data.dsize,
2559                                  (long unsigned)sizeof(uint32_t)));
2560                 return;
2561         }
2562         if (data.dptr == NULL) {
2563                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2564                 return;
2565         }
2566
2567         req = talloc(ctdb, struct srvid_request);
2568         CTDB_NO_MEMORY_VOID(ctdb, req);
2569
2570         req->srvid = 0; /* No reply */
2571         req->pnn = -1;
2572         req->data = *((uint32_t *)data.dptr); /* Timeout */
2573
2574         data2.dsize = sizeof(*req);
2575         data2.dptr = (uint8_t *)req;
2576
2577         disable_takeover_runs_handler(rec->ctdb,
2578                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2579                                       data2, rec);
2580 }
2581
2582 /*
2583   handler for ip reallocate, just add it to the list of requests and 
2584   handle this later in the monitor_cluster loop so we do not recurse
2585   with other requests to takeover_run()
2586 */
2587 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2588                                   TDB_DATA data, void *private_data)
2589 {
2590         struct srvid_request *request;
2591         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2592                                                     struct ctdb_recoverd);
2593
2594         if (data.dsize != sizeof(struct srvid_request)) {
2595                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2596                 return;
2597         }
2598
2599         request = (struct srvid_request *)data.dptr;
2600
2601         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2602 }
2603
2604 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2605                                           struct ctdb_recoverd *rec)
2606 {
2607         TDB_DATA result;
2608         int32_t ret;
2609         uint32_t culprit;
2610
2611         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2612
2613         /* update the list of public ips that a node can handle for
2614            all connected nodes
2615         */
2616         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2617         if (ret != 0) {
2618                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2619                                  culprit));
2620                 rec->need_takeover_run = true;
2621         }
2622         if (ret == 0) {
2623                 if (do_takeover_run(rec, rec->nodemap, false)) {
2624                         ret = ctdb_get_pnn(ctdb);
2625                 } else {
2626                         ret = -1;
2627                 }
2628         }
2629
2630         result.dsize = sizeof(int32_t);
2631         result.dptr  = (uint8_t *)&ret;
2632
2633         srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
2634 }
2635
2636
2637 /*
2638   handler for recovery master elections
2639 */
2640 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2641                              TDB_DATA data, void *private_data)
2642 {
2643         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2644         int ret;
2645         struct election_message *em = (struct election_message *)data.dptr;
2646         TALLOC_CTX *mem_ctx;
2647
2648         /* we got an election packet - update the timeout for the election */
2649         talloc_free(rec->election_timeout);
2650         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2651                                                 fast_start ?
2652                                                 timeval_current_ofs(0, 500000) :
2653                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2654                                                 ctdb_election_timeout, rec);
2655
2656         mem_ctx = talloc_new(ctdb);
2657
2658         /* someone called an election. check their election data
2659            and if we disagree and we would rather be the elected node, 
2660            send a new election message to all other nodes
2661          */
2662         if (ctdb_election_win(rec, em)) {
2663                 if (!rec->send_election_te) {
2664                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2665                                                                 timeval_current_ofs(0, 500000),
2666                                                                 election_send_request, rec);
2667                 }
2668                 talloc_free(mem_ctx);
2669                 /*unban_all_nodes(ctdb);*/
2670                 return;
2671         }
2672         
2673         /* we didn't win */
2674         talloc_free(rec->send_election_te);
2675         rec->send_election_te = NULL;
2676
2677         if (ctdb->tunable.verify_recovery_lock != 0) {
2678                 /* release the recmaster lock */
2679                 if (em->pnn != ctdb->pnn &&
2680                     ctdb->recovery_lock_fd != -1) {
2681                         close(ctdb->recovery_lock_fd);
2682                         ctdb->recovery_lock_fd = -1;
2683                         unban_all_nodes(ctdb);
2684                 }
2685         }
2686
2687         /* ok, let that guy become recmaster then */
2688         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2689         if (ret != 0) {
2690                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2691                 talloc_free(mem_ctx);
2692                 return;
2693         }
2694
2695         talloc_free(mem_ctx);
2696         return;
2697 }
2698
2699
2700 /*
2701   force the start of the election process
2702  */
2703 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2704                            struct ctdb_node_map *nodemap)
2705 {
2706         int ret;
2707         struct ctdb_context *ctdb = rec->ctdb;
2708
2709         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2710
2711         /* set all nodes to recovery mode to stop all internode traffic */
2712         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2713         if (ret != 0) {
2714                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2715                 return;
2716         }
2717
2718         talloc_free(rec->election_timeout);
2719         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2720                                                 fast_start ?
2721                                                 timeval_current_ofs(0, 500000) :
2722                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2723                                                 ctdb_election_timeout, rec);
2724
2725         ret = send_election_request(rec, pnn);
2726         if (ret!=0) {
2727                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2728                 return;
2729         }
2730
2731         /* wait for a few seconds to collect all responses */
2732         ctdb_wait_election(rec);
2733 }
2734
2735
2736
2737 /*
2738   handler for when a node changes its flags
2739 */
2740 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2741                             TDB_DATA data, void *private_data)
2742 {
2743         int ret;
2744         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2745         struct ctdb_node_map *nodemap=NULL;
2746         TALLOC_CTX *tmp_ctx;
2747         int i;
2748         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2749         int disabled_flag_changed;
2750
2751         if (data.dsize != sizeof(*c)) {
2752                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2753                 return;
2754         }
2755
2756         tmp_ctx = talloc_new(ctdb);
2757         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2758
2759         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2760         if (ret != 0) {
2761                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2762                 talloc_free(tmp_ctx);
2763                 return;         
2764         }
2765
2766
2767         for (i=0;i<nodemap->num;i++) {
2768                 if (nodemap->nodes[i].pnn == c->pnn) break;
2769         }
2770
2771         if (i == nodemap->num) {
2772                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2773                 talloc_free(tmp_ctx);
2774                 return;
2775         }
2776
2777         if (c->old_flags != c->new_flags) {
2778                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2779         }
2780
2781         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2782
2783         nodemap->nodes[i].flags = c->new_flags;
2784
2785         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2786                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2787
2788         if (ret == 0) {
2789                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2790                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2791         }
2792         
2793         if (ret == 0 &&
2794             ctdb->recovery_master == ctdb->pnn &&
2795             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2796                 /* Only do the takeover run if the perm disabled or unhealthy
2797                    flags changed since these will cause an ip failover but not
2798                    a recovery.
2799                    If the node became disconnected or banned this will also
2800                    lead to an ip address failover but that is handled 
2801                    during recovery
2802                 */
2803                 if (disabled_flag_changed) {
2804                         rec->need_takeover_run = true;
2805                 }
2806         }
2807
2808         talloc_free(tmp_ctx);
2809 }
2810
2811 /*
2812   handler for when we need to push out flag changes ot all other nodes
2813 */
2814 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2815                             TDB_DATA data, void *private_data)
2816 {
2817         int ret;
2818         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2819         struct ctdb_node_map *nodemap=NULL;
2820         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2821         uint32_t recmaster;
2822         uint32_t *nodes;
2823
2824         /* find the recovery master */
2825         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2826         if (ret != 0) {
2827                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2828                 talloc_free(tmp_ctx);
2829                 return;
2830         }
2831
2832         /* read the node flags from the recmaster */
2833         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2834         if (ret != 0) {
2835                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2836                 talloc_free(tmp_ctx);
2837                 return;
2838         }
2839         if (c->pnn >= nodemap->num) {
2840                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2841                 talloc_free(tmp_ctx);
2842                 return;
2843         }
2844
2845         /* send the flags update to all connected nodes */
2846         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2847
2848         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2849                                       nodes, 0, CONTROL_TIMEOUT(),
2850                                       false, data,
2851                                       NULL, NULL,
2852                                       NULL) != 0) {
2853                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2854
2855                 talloc_free(tmp_ctx);
2856                 return;
2857         }
2858
2859         talloc_free(tmp_ctx);
2860 }
2861
2862
2863 struct verify_recmode_normal_data {
2864         uint32_t count;
2865         enum monitor_result status;
2866 };
2867
2868 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2869 {
2870         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2871
2872
2873         /* one more node has responded with recmode data*/
2874         rmdata->count--;
2875
2876         /* if we failed to get the recmode, then return an error and let
2877            the main loop try again.
2878         */
2879         if (state->state != CTDB_CONTROL_DONE) {
2880                 if (rmdata->status == MONITOR_OK) {
2881                         rmdata->status = MONITOR_FAILED;
2882                 }
2883                 return;
2884         }
2885
2886         /* if we got a response, then the recmode will be stored in the
2887            status field
2888         */
2889         if (state->status != CTDB_RECOVERY_NORMAL) {
2890                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2891                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2892         }
2893
2894         return;
2895 }
2896
2897
2898 /* verify that all nodes are in normal recovery mode */
2899 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2900 {
2901         struct verify_recmode_normal_data *rmdata;
2902         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2903         struct ctdb_client_control_state *state;
2904         enum monitor_result status;
2905         int j;
2906         
2907         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2908         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2909         rmdata->count  = 0;
2910         rmdata->status = MONITOR_OK;
2911
2912         /* loop over all active nodes and send an async getrecmode call to 
2913            them*/
2914         for (j=0; j<nodemap->num; j++) {
2915                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2916                         continue;
2917                 }
2918                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2919                                         CONTROL_TIMEOUT(), 
2920                                         nodemap->nodes[j].pnn);
2921                 if (state == NULL) {
2922                         /* we failed to send the control, treat this as 
2923                            an error and try again next iteration
2924                         */                      
2925                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2926                         talloc_free(mem_ctx);
2927                         return MONITOR_FAILED;
2928                 }
2929
2930                 /* set up the callback functions */
2931                 state->async.fn = verify_recmode_normal_callback;
2932                 state->async.private_data = rmdata;
2933
2934                 /* one more control to wait for to complete */
2935                 rmdata->count++;
2936         }
2937
2938
2939         /* now wait for up to the maximum number of seconds allowed
2940            or until all nodes we expect a response from has replied
2941         */
2942         while (rmdata->count > 0) {
2943                 event_loop_once(ctdb->ev);
2944         }
2945
2946         status = rmdata->status;
2947         talloc_free(mem_ctx);
2948         return status;
2949 }
2950
2951
2952 struct verify_recmaster_data {
2953         struct ctdb_recoverd *rec;
2954         uint32_t count;
2955         uint32_t pnn;
2956         enum monitor_result status;
2957 };
2958
2959 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2960 {
2961         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2962
2963
2964         /* one more node has responded with recmaster data*/
2965         rmdata->count--;
2966
2967         /* if we failed to get the recmaster, then return an error and let
2968            the main loop try again.
2969         */
2970         if (state->state != CTDB_CONTROL_DONE) {
2971                 if (rmdata->status == MONITOR_OK) {
2972                         rmdata->status = MONITOR_FAILED;
2973                 }
2974                 return;
2975         }
2976
2977         /* if we got a response, then the recmaster will be stored in the
2978            status field
2979         */
2980         if (state->status != rmdata->pnn) {
2981                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2982                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2983                 rmdata->status = MONITOR_ELECTION_NEEDED;
2984         }
2985
2986         return;
2987 }
2988
2989
2990 /* verify that all nodes agree that we are the recmaster */
2991 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2992 {
2993         struct ctdb_context *ctdb = rec->ctdb;
2994         struct verify_recmaster_data *rmdata;
2995         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2996         struct ctdb_client_control_state *state;
2997         enum monitor_result status;
2998         int j;
2999         
3000         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3001         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3002         rmdata->rec    = rec;
3003         rmdata->count  = 0;
3004         rmdata->pnn    = pnn;
3005         rmdata->status = MONITOR_OK;
3006
3007         /* loop over all active nodes and send an async getrecmaster call to 
3008            them*/
3009         for (j=0; j<nodemap->num; j++) {
3010                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3011                         continue;
3012                 }
3013                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3014                                         CONTROL_TIMEOUT(),
3015                                         nodemap->nodes[j].pnn);
3016                 if (state == NULL) {
3017                         /* we failed to send the control, treat this as 
3018                            an error and try again next iteration
3019                         */                      
3020                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3021                         talloc_free(mem_ctx);
3022                         return MONITOR_FAILED;
3023                 }
3024
3025                 /* set up the callback functions */
3026                 state->async.fn = verify_recmaster_callback;
3027                 state->async.private_data = rmdata;
3028
3029                 /* one more control to wait for to complete */
3030                 rmdata->count++;
3031         }
3032
3033
3034         /* now wait for up to the maximum number of seconds allowed
3035            or until all nodes we expect a response from has replied
3036         */
3037         while (rmdata->count > 0) {
3038                 event_loop_once(ctdb->ev);
3039         }
3040
3041         status = rmdata->status;
3042         talloc_free(mem_ctx);
3043         return status;
3044 }
3045
3046 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3047                                     struct ctdb_recoverd *rec)
3048 {
3049         struct ctdb_control_get_ifaces *ifaces = NULL;
3050         TALLOC_CTX *mem_ctx;
3051         bool ret = false;
3052
3053         mem_ctx = talloc_new(NULL);
3054
3055         /* Read the interfaces from the local node */
3056         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3057                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3058                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3059                 /* We could return an error.  However, this will be
3060                  * rare so we'll decide that the interfaces have
3061                  * actually changed, just in case.
3062                  */
3063                 talloc_free(mem_ctx);
3064                 return true;
3065         }
3066
3067         if (!rec->ifaces) {
3068                 /* We haven't been here before so things have changed */
3069                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3070                 ret = true;
3071         } else if (rec->ifaces->num != ifaces->num) {
3072                 /* Number of interfaces has changed */
3073                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3074                                      rec->ifaces->num, ifaces->num));
3075                 ret = true;
3076         } else {
3077                 /* See if interface names or link states have changed */
3078                 int i;
3079                 for (i = 0; i < rec->ifaces->num; i++) {
3080                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3081                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3082                                 DEBUG(DEBUG_NOTICE,
3083                                       ("Interface in slot %d changed: %s => %s\n",
3084                                        i, iface->name, ifaces->ifaces[i].name));
3085                                 ret = true;
3086                                 break;
3087                         }
3088                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3089                                 DEBUG(DEBUG_NOTICE,
3090                                       ("Interface %s changed state: %d => %d\n",
3091                                        iface->name, iface->link_state,
3092                                        ifaces->ifaces[i].link_state));
3093                                 ret = true;
3094                                 break;
3095                         }
3096                 }
3097         }
3098
3099         talloc_free(rec->ifaces);
3100         rec->ifaces = talloc_steal(rec, ifaces);
3101
3102         talloc_free(mem_ctx);
3103         return ret;
3104 }
3105
3106 /* called to check that the local allocation of public ip addresses is ok.
3107 */
3108 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3109 {
3110         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3111         struct ctdb_uptime *uptime1 = NULL;
3112         struct ctdb_uptime *uptime2 = NULL;
3113         int ret, j;
3114         bool need_takeover_run = false;
3115
3116         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3117                                 CTDB_CURRENT_NODE, &uptime1);
3118         if (ret != 0) {
3119                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3120                 talloc_free(mem_ctx);
3121                 return -1;
3122         }
3123
3124         if (interfaces_have_changed(ctdb, rec)) {
3125                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3126                                      "local node %u - force takeover run\n",
3127                                      pnn));
3128                 need_takeover_run = true;
3129         }
3130
3131         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3132                                 CTDB_CURRENT_NODE, &uptime2);
3133         if (ret != 0) {
3134                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3135                 talloc_free(mem_ctx);
3136                 return -1;
3137         }
3138
3139         /* skip the check if the startrecovery time has changed */
3140         if (timeval_compare(&uptime1->last_recovery_started,
3141                             &uptime2->last_recovery_started) != 0) {
3142                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3143                 talloc_free(mem_ctx);
3144                 return 0;
3145         }
3146
3147         /* skip the check if the endrecovery time has changed */
3148         if (timeval_compare(&uptime1->last_recovery_finished,
3149                             &uptime2->last_recovery_finished) != 0) {
3150                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3151                 talloc_free(mem_ctx);
3152                 return 0;
3153         }
3154
3155         /* skip the check if we have started but not finished recovery */
3156         if (timeval_compare(&uptime1->last_recovery_finished,
3157                             &uptime1->last_recovery_started) != 1) {
3158                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3159                 talloc_free(mem_ctx);
3160
3161                 return 0;
3162         }
3163
3164         /* verify that we have the ip addresses we should have
3165            and we dont have ones we shouldnt have.
3166            if we find an inconsistency we set recmode to
3167            active on the local node and wait for the recmaster
3168            to do a full blown recovery.
3169            also if the pnn is -1 and we are healthy and can host the ip
3170            we also request a ip reallocation.
3171         */
3172         if (ctdb->tunable.disable_ip_failover == 0) {
3173                 struct ctdb_all_public_ips *ips = NULL;
3174
3175                 /* read the *available* IPs from the local node */
3176                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3177                 if (ret != 0) {
3178                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3179                         talloc_free(mem_ctx);
3180                         return -1;
3181                 }
3182
3183                 for (j=0; j<ips->num; j++) {
3184                         if (ips->ips[j].pnn == -1 &&
3185                             nodemap->nodes[pnn].flags == 0) {
3186                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3187                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3188                                 need_takeover_run = true;
3189                         }
3190                 }
3191
3192                 talloc_free(ips);
3193
3194                 /* read the *known* IPs from the local node */
3195                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3196                 if (ret != 0) {
3197                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3198                         talloc_free(mem_ctx);
3199                         return -1;
3200                 }
3201
3202                 for (j=0; j<ips->num; j++) {
3203                         if (ips->ips[j].pnn == pnn) {
3204                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3205                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3206                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3207                                         need_takeover_run = true;
3208                                 }
3209                         } else {
3210                                 if (ctdb->do_checkpublicip &&
3211                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3212
3213                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3214                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3215
3216                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3217                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3218                                         }
3219                                 }
3220                         }
3221                 }
3222         }
3223
3224         if (need_takeover_run) {
3225                 struct srvid_request rd;
3226                 TDB_DATA data;
3227
3228                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3229
3230                 rd.pnn = ctdb->pnn;
3231                 rd.srvid = 0;
3232                 data.dptr = (uint8_t *)&rd;
3233                 data.dsize = sizeof(rd);
3234
3235                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3236                 if (ret != 0) {
3237                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3238                 }
3239         }
3240         talloc_free(mem_ctx);
3241         return 0;
3242 }
3243
3244
3245 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3246 {
3247         struct ctdb_node_map **remote_nodemaps = callback_data;
3248
3249         if (node_pnn >= ctdb->num_nodes) {
3250                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3251                 return;
3252         }
3253
3254         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3255
3256 }
3257
3258 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3259         struct ctdb_node_map *nodemap,
3260         struct ctdb_node_map **remote_nodemaps)
3261 {
3262         uint32_t *nodes;
3263
3264         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3265         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3266                                         nodes, 0,
3267                                         CONTROL_TIMEOUT(), false, tdb_null,
3268                                         async_getnodemap_callback,
3269                                         NULL,
3270                                         remote_nodemaps) != 0) {
3271                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3272
3273                 return -1;
3274         }
3275
3276         return 0;
3277 }
3278
3279 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3280 struct ctdb_check_reclock_state {
3281         struct ctdb_context *ctdb;
3282         struct timeval start_time;
3283         int fd[2];
3284         pid_t child;
3285         struct timed_event *te;
3286         struct fd_event *fde;
3287         enum reclock_child_status status;
3288 };
3289
3290 /* when we free the reclock state we must kill any child process.
3291 */
3292 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3293 {
3294         struct ctdb_context *ctdb = state->ctdb;
3295
3296         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3297
3298         if (state->fd[0] != -1) {
3299                 close(state->fd[0]);
3300                 state->fd[0] = -1;
3301         }
3302         if (state->fd[1] != -1) {
3303                 close(state->fd[1]);
3304                 state->fd[1] = -1;
3305         }
3306         ctdb_kill(ctdb, state->child, SIGKILL);
3307         return 0;
3308 }
3309
3310 /*
3311   called if our check_reclock child times out. this would happen if
3312   i/o to the reclock file blocks.
3313  */
3314 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3315                                          struct timeval t, void *private_data)
3316 {
3317         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3318                                            struct ctdb_check_reclock_state);
3319
3320         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3321         state->status = RECLOCK_TIMEOUT;
3322 }
3323
3324 /* this is called when the child process has completed checking the reclock
3325    file and has written data back to us through the pipe.
3326 */
3327 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3328                              uint16_t flags, void *private_data)
3329 {
3330         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3331                                              struct ctdb_check_reclock_state);
3332         char c = 0;
3333         int ret;
3334
3335         /* we got a response from our child process so we can abort the
3336            timeout.
3337         */
3338         talloc_free(state->te);
3339         state->te = NULL;
3340
3341         ret = read(state->fd[0], &c, 1);
3342         if (ret != 1 || c != RECLOCK_OK) {
3343                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3344                 state->status = RECLOCK_FAILED;
3345
3346                 return;
3347         }
3348
3349         state->status = RECLOCK_OK;
3350         return;
3351 }
3352
3353 static int check_recovery_lock(struct ctdb_context *ctdb)
3354 {
3355         int ret;
3356         struct ctdb_check_reclock_state *state;
3357         pid_t parent = getpid();
3358
3359         if (ctdb->recovery_lock_fd == -1) {
3360                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3361                 return -1;
3362         }
3363
3364         state = talloc(ctdb, struct ctdb_check_reclock_state);
3365         CTDB_NO_MEMORY(ctdb, state);
3366
3367         state->ctdb = ctdb;
3368         state->start_time = timeval_current();
3369         state->status = RECLOCK_CHECKING;
3370         state->fd[0] = -1;
3371         state->fd[1] = -1;
3372
3373         ret = pipe(state->fd);
3374         if (ret != 0) {
3375                 talloc_free(state);
3376                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3377                 return -1;
3378         }
3379
3380         state->child = ctdb_fork(ctdb);
3381         if (state->child == (pid_t)-1) {
3382                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3383                 close(state->fd[0]);
3384                 state->fd[0] = -1;
3385                 close(state->fd[1]);
3386                 state->fd[1] = -1;
3387                 talloc_free(state);
3388                 return -1;
3389         }
3390
3391         if (state->child == 0) {
3392                 char cc = RECLOCK_OK;
3393                 close(state->fd[0]);
3394                 state->fd[0] = -1;
3395
3396                 ctdb_set_process_name("ctdb_rec_reclock");
3397                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3398                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3399                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3400                         cc = RECLOCK_FAILED;
3401                 }
3402
3403                 write(state->fd[1], &cc, 1);
3404                 /* make sure we die when our parent dies */
3405                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3406                         sleep(5);
3407                 }
3408                 _exit(0);
3409         }
3410         close(state->fd[1]);
3411         state->fd[1] = -1;
3412         set_close_on_exec(state->fd[0]);
3413
3414         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3415
3416         talloc_set_destructor(state, check_reclock_destructor);
3417
3418         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3419                                     ctdb_check_reclock_timeout, state);
3420         if (state->te == NULL) {
3421                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3422                 talloc_free(state);
3423                 return -1;
3424         }
3425
3426         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3427                                 EVENT_FD_READ,
3428                                 reclock_child_handler,
3429                                 (void *)state);
3430
3431         if (state->fde == NULL) {
3432                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3433                 talloc_free(state);
3434                 return -1;
3435         }
3436         tevent_fd_set_auto_close(state->fde);
3437
3438         while (state->status == RECLOCK_CHECKING) {
3439                 event_loop_once(ctdb->ev);
3440         }
3441
3442         if (state->status == RECLOCK_FAILED) {
3443                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3444                 close(ctdb->recovery_lock_fd);
3445                 ctdb->recovery_lock_fd = -1;
3446                 talloc_free(state);
3447                 return -1;
3448         }
3449
3450         talloc_free(state);
3451         return 0;
3452 }
3453
3454 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3455 {
3456         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3457         const char *reclockfile;
3458
3459         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3460                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3461                 talloc_free(tmp_ctx);
3462                 return -1;      
3463         }
3464
3465         if (reclockfile == NULL) {
3466                 if (ctdb->recovery_lock_file != NULL) {
3467                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3468                         talloc_free(ctdb->recovery_lock_file);
3469                         ctdb->recovery_lock_file = NULL;
3470                         if (ctdb->recovery_lock_fd != -1) {
3471                                 close(ctdb->recovery_lock_fd);
3472                                 ctdb->recovery_lock_fd = -1;
3473                         }
3474                 }
3475                 ctdb->tunable.verify_recovery_lock = 0;
3476                 talloc_free(tmp_ctx);
3477                 return 0;
3478         }
3479
3480         if (ctdb->recovery_lock_file == NULL) {
3481                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3482                 if (ctdb->recovery_lock_fd != -1) {
3483                         close(ctdb->recovery_lock_fd);
3484                         ctdb->recovery_lock_fd = -1;
3485                 }
3486                 talloc_free(tmp_ctx);
3487                 return 0;
3488         }
3489
3490
3491         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3492                 talloc_free(tmp_ctx);
3493                 return 0;
3494         }
3495
3496         talloc_free(ctdb->recovery_lock_file);
3497         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3498         ctdb->tunable.verify_recovery_lock = 0;
3499         if (ctdb->recovery_lock_fd != -1) {
3500                 close(ctdb->recovery_lock_fd);
3501                 ctdb->recovery_lock_fd = -1;
3502         }
3503
3504         talloc_free(tmp_ctx);
3505         return 0;
3506 }
3507
3508 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3509                       TALLOC_CTX *mem_ctx)
3510 {
3511         uint32_t pnn;
3512         struct ctdb_node_map *nodemap=NULL;
3513         struct ctdb_node_map *recmaster_nodemap=NULL;
3514         struct ctdb_node_map **remote_nodemaps=NULL;
3515         struct ctdb_vnn_map *vnnmap=NULL;
3516         struct ctdb_vnn_map *remote_vnnmap=NULL;
3517         int32_t debug_level;
3518         int i, j, ret;
3519         bool self_ban;
3520
3521
3522         /* verify that the main daemon is still running */
3523         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3524                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3525                 exit(-1);
3526         }
3527
3528         /* ping the local daemon to tell it we are alive */
3529         ctdb_ctrl_recd_ping(ctdb);
3530
3531         if (rec->election_timeout) {
3532                 /* an election is in progress */
3533                 return;
3534         }
3535
3536         /* read the debug level from the parent and update locally */
3537         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3538         if (ret !=0) {
3539                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3540                 return;
3541         }
3542         LogLevel = debug_level;
3543
3544         /* get relevant tunables */
3545         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3546         if (ret != 0) {
3547                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3548                 return;
3549         }
3550
3551         /* get the current recovery lock file from the server */
3552         if (update_recovery_lock_file(ctdb) != 0) {
3553                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3554                 return;
3555         }
3556
3557         /* Make sure that if recovery lock verification becomes disabled when
3558            we close the file
3559         */
3560         if (ctdb->tunable.verify_recovery_lock == 0) {
3561                 if (ctdb->recovery_lock_fd != -1) {
3562                         close(ctdb->recovery_lock_fd);
3563                         ctdb->recovery_lock_fd = -1;
3564                 }
3565         }
3566
3567         pnn = ctdb_get_pnn(ctdb);
3568
3569         /* get the vnnmap */
3570         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3571         if (ret != 0) {
3572                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3573                 return;
3574         }
3575
3576
3577         /* get number of nodes */
3578         if (rec->nodemap) {
3579                 talloc_free(rec->nodemap);
3580                 rec->nodemap = NULL;
3581                 nodemap=NULL;
3582         }
3583         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3584         if (ret != 0) {
3585                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3586                 return;
3587         }
3588         nodemap = rec->nodemap;
3589
3590         /* remember our own node flags */
3591         rec->node_flags = nodemap->nodes[pnn].flags;
3592
3593         ban_misbehaving_nodes(rec, &self_ban);
3594         if (self_ban) {
3595                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3596                 return;
3597         }
3598
3599         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3600            also frozen and that the recmode is set to active.
3601         */
3602         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3603                 /* If this node has become inactive then we want to
3604                  * reduce the chances of it taking over the recovery
3605                  * master role when it becomes active again.  This
3606                  * helps to stabilise the recovery master role so that
3607                  * it stays on the most stable node.
3608                  */
3609                 rec->priority_time = timeval_current();
3610
3611                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3612                 if (ret != 0) {
3613                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3614                 }
3615                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3616                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3617
3618                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3619                         if (ret != 0) {
3620                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3621                                 return;
3622                         }
3623                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3624                         if (ret != 0) {
3625                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3626
3627                                 return;
3628                         }
3629                 }
3630
3631                 /* If this node is stopped or banned then it is not the recovery
3632                  * master, so don't do anything. This prevents stopped or banned
3633                  * node from starting election and sending unnecessary controls.
3634                  */
3635                 return;
3636         }
3637
3638         /* check which node is the recovery master */
3639         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3640         if (ret != 0) {
3641                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3642                 return;
3643         }
3644
3645         /* If we are not the recmaster then do some housekeeping */
3646         if (rec->recmaster != pnn) {
3647                 /* Ignore any IP reallocate requests - only recmaster
3648                  * processes them
3649                  */
3650                 TALLOC_FREE(rec->reallocate_requests);
3651                 /* Clear any nodes that should be force rebalanced in
3652                  * the next takeover run.  If the recovery master role
3653                  * has moved then we don't want to process these some
3654                  * time in the future.
3655                  */
3656                 TALLOC_FREE(rec->force_rebalance_nodes);
3657         }
3658
3659         /* This is a special case.  When recovery daemon is started, recmaster
3660          * is set to -1.  If a node is not started in stopped state, then
3661          * start election to decide recovery master
3662          */
3663         if (rec->recmaster == (uint32_t)-1) {
3664                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3665                 force_election(rec, pnn, nodemap);
3666                 return;
3667         }
3668
3669         /* update the capabilities for all nodes */
3670         ret = update_capabilities(ctdb, nodemap);
3671         if (ret != 0) {
3672                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3673                 return;
3674         }
3675
3676         /*
3677          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3678          * but we have, then force an election and try to become the new
3679          * recmaster.
3680          */
3681         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3682             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3683              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3684                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3685                                   " but we (node %u) have - force an election\n",
3686                                   rec->recmaster, pnn));
3687                 force_election(rec, pnn, nodemap);
3688                 return;
3689         }
3690
3691         /* count how many active nodes there are */
3692         rec->num_active    = 0;
3693         rec->num_lmasters  = 0;
3694         rec->num_connected = 0;
3695         for (i=0; i<nodemap->num; i++) {
3696                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3697                         rec->num_active++;
3698                         if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
3699                                 rec->num_lmasters++;
3700                         }
3701                 }
3702                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3703                         rec->num_connected++;
3704                 }
3705         }
3706
3707
3708         /* verify that the recmaster node is still active */
3709         for (j=0; j<nodemap->num; j++) {
3710                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3711                         break;
3712                 }
3713         }
3714
3715         if (j == nodemap->num) {
3716                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3717                 force_election(rec, pnn, nodemap);
3718                 return;
3719         }
3720
3721         /* if recovery master is disconnected we must elect a new recmaster */
3722         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3723                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3724                 force_election(rec, pnn, nodemap);
3725                 return;
3726         }
3727
3728         /* get nodemap from the recovery master to check if it is inactive */
3729         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3730                                    mem_ctx, &recmaster_nodemap);
3731         if (ret != 0) {
3732                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3733                           nodemap->nodes[j].pnn));
3734                 return;
3735         }
3736
3737
3738         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3739             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3740                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3741                 /*
3742                  * update our nodemap to carry the recmaster's notion of
3743                  * its own flags, so that we don't keep freezing the
3744                  * inactive recmaster node...
3745                  */
3746                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3747                 force_election(rec, pnn, nodemap);
3748                 return;
3749         }
3750
3751         /* verify that we have all ip addresses we should have and we dont
3752          * have addresses we shouldnt have.
3753          */ 
3754         if (ctdb->tunable.disable_ip_failover == 0 &&
3755             rec->takeover_runs_disable_ctx == NULL) {
3756                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3757                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3758                 }
3759         }
3760
3761
3762         /* if we are not the recmaster then we do not need to check
3763            if recovery is needed
3764          */
3765         if (pnn != rec->recmaster) {
3766                 return;
3767         }
3768
3769
3770         /* ensure our local copies of flags are right */
3771         ret = update_local_flags(rec, nodemap);
3772         if (ret == MONITOR_ELECTION_NEEDED) {
3773                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3774                 force_election(rec, pnn, nodemap);
3775                 return;
3776         }
3777         if (ret != MONITOR_OK) {
3778                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3779                 return;
3780         }
3781
3782         if (ctdb->num_nodes != nodemap->num) {
3783                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3784                 ctdb_load_nodes_file(ctdb);
3785                 return;
3786         }
3787
3788         /* verify that all active nodes agree that we are the recmaster */
3789         switch (verify_recmaster(rec, nodemap, pnn)) {
3790         case MONITOR_RECOVERY_NEEDED:
3791                 /* can not happen */
3792                 return;
3793         case MONITOR_ELECTION_NEEDED:
3794                 force_election(rec, pnn, nodemap);
3795                 return;
3796         case MONITOR_OK:
3797                 break;
3798         case MONITOR_FAILED:
3799                 return;
3800         }
3801
3802
3803         if (rec->need_recovery) {
3804                 /* a previous recovery didn't finish */
3805                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3806                 return;
3807         }
3808
3809         /* verify that all active nodes are in normal mode 
3810            and not in recovery mode 
3811         */
3812         switch (verify_recmode(ctdb, nodemap)) {
3813         case MONITOR_RECOVERY_NEEDED:
3814                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3815                 return;
3816         case MONITOR_FAILED:
3817                 return;
3818         case MONITOR_ELECTION_NEEDED:
3819                 /* can not happen */
3820         case MONITOR_OK:
3821                 break;
3822         }
3823
3824
3825         if (ctdb->tunable.verify_recovery_lock != 0) {
3826                 /* we should have the reclock - check its not stale */
3827                 ret = check_recovery_lock(ctdb);
3828                 if (ret != 0) {
3829                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3830                         ctdb_set_culprit(rec, ctdb->pnn);
3831                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3832                         return;
3833                 }
3834         }
3835
3836
3837         /* if there are takeovers requested, perform it and notify the waiters */
3838         if (rec->takeover_runs_disable_ctx == NULL &&
3839             rec->reallocate_requests) {
3840                 process_ipreallocate_requests(ctdb, rec);
3841         }
3842
3843         /* get the nodemap for all active remote nodes
3844          */
3845         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3846         if (remote_nodemaps == NULL) {
3847                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3848                 return;
3849         }
3850         for(i=0; i<nodemap->num; i++) {
3851                 remote_nodemaps[i] = NULL;
3852         }
3853         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3854                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3855                 return;
3856         } 
3857
3858         /* verify that all other nodes have the same nodemap as we have
3859         */
3860         for (j=0; j<nodemap->num; j++) {
3861                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3862                         continue;
3863                 }
3864
3865                 if (remote_nodemaps[j] == NULL) {
3866                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3867                         ctdb_set_culprit(rec, j);
3868
3869                         return;
3870                 }
3871
3872                 /* if the nodes disagree on how many nodes there are
3873                    then this is a good reason to try recovery
3874                  */
3875                 if (remote_nodemaps[j]->num != nodemap->num) {
3876                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3877                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3878                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3879                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3880                         return;
3881                 }
3882
3883                 /* if the nodes disagree on which nodes exist and are
3884                    active, then that is also a good reason to do recovery
3885                  */
3886                 for (i=0;i<nodemap->num;i++) {
3887                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3888                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3889                                           nodemap->nodes[j].pnn, i, 
3890                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3891                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3892                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3893                                             vnnmap);
3894                                 return;
3895                         }
3896                 }
3897         }
3898
3899         /*
3900          * Update node flags obtained from each active node. This ensure we have
3901          * up-to-date information for all the nodes.
3902          */
3903         for (j=0; j<nodemap->num; j++) {
3904                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3905                         continue;
3906                 }
3907                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3908         }
3909
3910         for (j=0; j<nodemap->num; j++) {
3911                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3912                         continue;
3913                 }
3914
3915                 /* verify the flags are consistent
3916                 */
3917                 for (i=0; i<nodemap->num; i++) {
3918                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3919                                 continue;
3920                         }
3921                         
3922                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3923                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3924                                   nodemap->nodes[j].pnn, 
3925                                   nodemap->nodes[i].pnn, 
3926                                   remote_nodemaps[j]->nodes[i].flags,
3927                                   nodemap->nodes[i].flags));
3928                                 if (i == j) {
3929                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3930                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3931                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3932                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3933                                                     vnnmap);
3934                                         return;
3935                                 } else {
3936                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3937                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3938                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3939                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3940                                                     vnnmap);
3941                                         return;
3942                                 }
3943                         }
3944                 }
3945         }
3946
3947
3948         /* There must be the same number of lmasters in the vnn map as
3949          * there are active nodes with the lmaster capability...  or
3950          * do a recovery.
3951          */
3952         if (vnnmap->size != rec->num_lmasters) {
3953                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3954                           vnnmap->size, rec->num_lmasters));
3955                 ctdb_set_culprit(rec, ctdb->pnn);
3956                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3957                 return;
3958         }
3959
3960         /* verify that all active nodes in the nodemap also exist in 
3961            the vnnmap.
3962          */
3963         for (j=0; j<nodemap->num; j++) {
3964                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3965                         continue;
3966                 }
3967                 if (nodemap->nodes[j].pnn == pnn) {
3968                         continue;
3969                 }
3970
3971                 for (i=0; i<vnnmap->size; i++) {
3972                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3973                                 break;
3974                         }
3975                 }
3976                 if (i == vnnmap->size) {
3977                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3978                                   nodemap->nodes[j].pnn));
3979                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3980                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3981                         return;
3982                 }
3983         }
3984
3985         
3986         /* verify that all other nodes have the same vnnmap
3987            and are from the same generation
3988          */
3989         for (j=0; j<nodemap->num; j++) {
3990                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3991                         continue;
3992                 }
3993                 if (nodemap->nodes[j].pnn == pnn) {
3994                         continue;
3995                 }
3996
3997                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3998                                           mem_ctx, &remote_vnnmap);
3999                 if (ret != 0) {
4000                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
4001                                   nodemap->nodes[j].pnn));
4002                         return;
4003                 }
4004
4005                 /* verify the vnnmap generation is the same */
4006                 if (vnnmap->generation != remote_vnnmap->generation) {
4007                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
4008                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
4009                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4010                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4011                         return;
4012                 }
4013
4014                 /* verify the vnnmap size is the same */
4015                 if (vnnmap->size != remote_vnnmap->size) {
4016                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4017                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4018                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4019                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4020                         return;
4021                 }
4022
4023                 /* verify the vnnmap is the same */
4024                 for (i=0;i<vnnmap->size;i++) {
4025                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4026                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4027                                           nodemap->nodes[j].pnn));
4028                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4029                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4030                                             vnnmap);
4031                                 return;
4032                         }
4033                 }
4034         }
4035
4036         /* we might need to change who has what IP assigned */
4037         if (rec->need_takeover_run) {
4038                 uint32_t culprit = (uint32_t)-1;
4039
4040                 rec->need_takeover_run = false;
4041
4042                 /* update the list of public ips that a node can handle for
4043                    all connected nodes
4044                 */
4045                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4046                 if (ret != 0) {
4047                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4048                                          culprit));
4049                         rec->need_takeover_run = true;
4050                         return;
4051                 }
4052
4053                 /* execute the "startrecovery" event script on all nodes */
4054                 ret = run_startrecovery_eventscript(rec, nodemap);
4055                 if (ret!=0) {
4056                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4057                         ctdb_set_culprit(rec, ctdb->pnn);
4058                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4059                         return;
4060                 }
4061
4062                 /* If takeover run fails, then the offending nodes are
4063                  * assigned ban culprit counts. And we re-try takeover.
4064                  * If takeover run fails repeatedly, the node would get
4065                  * banned.
4066                  *
4067                  * If rec->need_takeover_run is not set to true at this
4068                  * failure, monitoring is disabled cluster-wide (via
4069                  * startrecovery eventscript) and will not get enabled.
4070                  */
4071                 if (!do_takeover_run(rec, nodemap, true)) {
4072                         return;
4073                 }
4074
4075                 /* execute the "recovered" event script on all nodes */
4076                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4077 #if 0
4078 // we cant check whether the event completed successfully
4079 // since this script WILL fail if the node is in recovery mode
4080 // and if that race happens, the code here would just cause a second
4081 // cascading recovery.
4082                 if (ret!=0) {
4083                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4084                         ctdb_set_culprit(rec, ctdb->pnn);
4085                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4086                 }
4087 #endif
4088         }
4089 }
4090
4091 /*
4092   the main monitoring loop
4093  */
4094 static void monitor_cluster(struct ctdb_context *ctdb)
4095 {
4096         struct ctdb_recoverd *rec;
4097
4098         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4099
4100         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4101         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4102
4103         rec->ctdb = ctdb;
4104
4105         rec->takeover_run_in_progress = false;
4106
4107         rec->priority_time = timeval_current();
4108
4109         /* register a message port for sending memory dumps */
4110         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4111
4112         /* register a message port for requesting logs */
4113         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4114
4115         /* register a message port for clearing logs */
4116         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4117
4118         /* register a message port for recovery elections */
4119         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4120
4121         /* when nodes are disabled/enabled */
4122         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4123
4124         /* when we are asked to puch out a flag change */
4125         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4126
4127         /* register a message port for vacuum fetch */
4128         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4129
4130         /* register a message port for reloadnodes  */
4131         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4132
4133         /* register a message port for performing a takeover run */
4134         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4135
4136         /* register a message port for disabling the ip check for a short while */
4137         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4138
4139         /* register a message port for updating the recovery daemons node assignment for an ip */
4140         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4141
4142         /* register a message port for forcing a rebalance of a node next
4143            reallocation */
4144         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4145
4146         /* Register a message port for disabling takeover runs */
4147         ctdb_client_set_message_handler(ctdb,
4148                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4149                                         disable_takeover_runs_handler, rec);
4150
4151         for (;;) {
4152                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4153                 struct timeval start;
4154                 double elapsed;
4155
4156                 if (!mem_ctx) {
4157                         DEBUG(DEBUG_CRIT,(__location__
4158                                           " Failed to create temp context\n"));
4159                         exit(-1);
4160                 }
4161
4162                 start = timeval_current();
4163                 main_loop(ctdb, rec, mem_ctx);
4164                 talloc_free(mem_ctx);
4165
4166                 /* we only check for recovery once every second */
4167                 elapsed = timeval_elapsed(&start);
4168                 if (elapsed < ctdb->tunable.recover_interval) {
4169                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4170                                           - elapsed);
4171                 }
4172         }
4173 }
4174
4175 /*
4176   event handler for when the main ctdbd dies
4177  */
4178 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4179                                  uint16_t flags, void *private_data)
4180 {
4181         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4182         _exit(1);
4183 }
4184
4185 /*
4186   called regularly to verify that the recovery daemon is still running
4187  */
4188 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4189                               struct timeval yt, void *p)
4190 {
4191         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4192
4193         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4194                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4195
4196                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4197                                 ctdb_restart_recd, ctdb);
4198
4199                 return;
4200         }
4201
4202         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4203                         timeval_current_ofs(30, 0),
4204                         ctdb_check_recd, ctdb);
4205 }
4206
4207 static void recd_sig_child_handler(struct event_context *ev,
4208         struct signal_event *se, int signum, int count,
4209         void *dont_care, 
4210         void *private_data)
4211 {
4212 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4213         int status;
4214         pid_t pid = -1;
4215
4216         while (pid != 0) {
4217                 pid = waitpid(-1, &status, WNOHANG);
4218                 if (pid == -1) {
4219                         if (errno != ECHILD) {
4220                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4221                         }
4222                         return;
4223                 }
4224                 if (pid > 0) {
4225                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4226                 }
4227         }
4228 }
4229
4230 /*
4231   startup the recovery daemon as a child of the main ctdb daemon
4232  */
4233 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4234 {
4235         int fd[2];
4236         struct signal_event *se;
4237         struct tevent_fd *fde;
4238
4239         if (pipe(fd) != 0) {
4240                 return -1;
4241         }
4242
4243         ctdb->ctdbd_pid = getpid();
4244
4245         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4246         if (ctdb->recoverd_pid == -1) {
4247                 return -1;
4248         }
4249
4250         if (ctdb->recoverd_pid != 0) {
4251                 talloc_free(ctdb->recd_ctx);
4252                 ctdb->recd_ctx = talloc_new(ctdb);
4253                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4254
4255                 close(fd[0]);
4256                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4257                                 timeval_current_ofs(30, 0),
4258                                 ctdb_check_recd, ctdb);
4259                 return 0;
4260         }
4261
4262         close(fd[1]);
4263
4264         srandom(getpid() ^ time(NULL));
4265
4266         /* Clear the log ringbuffer */
4267         ctdb_clear_log(ctdb);
4268
4269         ctdb_set_process_name("ctdb_recovered");
4270         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4271                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4272                 exit(1);
4273         }
4274
4275         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4276
4277         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4278                      ctdb_recoverd_parent, &fd[0]);
4279         tevent_fd_set_auto_close(fde);
4280
4281         /* set up a handler to pick up sigchld */
4282         se = event_add_signal(ctdb->ev, ctdb,
4283                                      SIGCHLD, 0,
4284                                      recd_sig_child_handler,
4285                                      ctdb);
4286         if (se == NULL) {
4287                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4288                 exit(1);
4289         }
4290
4291         monitor_cluster(ctdb);
4292
4293         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4294         return -1;
4295 }
4296
4297 /*
4298   shutdown the recovery daemon
4299  */
4300 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4301 {
4302         if (ctdb->recoverd_pid == 0) {
4303                 return;
4304         }
4305
4306         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4307         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4308
4309         TALLOC_FREE(ctdb->recd_ctx);
4310         TALLOC_FREE(ctdb->recd_ping_count);
4311 }
4312
4313 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4314                        struct timeval t, void *private_data)
4315 {
4316         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4317
4318         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4319         ctdb_stop_recoverd(ctdb);
4320         ctdb_start_recoverd(ctdb);
4321 }