Revert "null out the pointer before we reload the nodes file"
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_lmasters;
133         uint32_t num_connected;
134         uint32_t last_culprit_node;
135         struct ctdb_node_map *nodemap;
136         struct timeval priority_time;
137         bool need_takeover_run;
138         bool need_recovery;
139         uint32_t node_flags;
140         struct timed_event *send_election_te;
141         struct timed_event *election_timeout;
142         struct vacuum_info *vacuum_info;
143         struct srvid_requests *reallocate_requests;
144         bool takeover_run_in_progress;
145         TALLOC_CTX *takeover_runs_disable_ctx;
146         struct ctdb_control_get_ifaces *ifaces;
147         uint32_t *force_rebalance_nodes;
148 };
149
150 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
151 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
152
153 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
154
155 /*
156   ban a node for a period of time
157  */
158 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
159 {
160         int ret;
161         struct ctdb_context *ctdb = rec->ctdb;
162         struct ctdb_ban_time bantime;
163        
164         if (!ctdb_validate_pnn(ctdb, pnn)) {
165                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
166                 return;
167         }
168
169         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
170
171         bantime.pnn  = pnn;
172         bantime.time = ban_time;
173
174         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
175         if (ret != 0) {
176                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
177                 return;
178         }
179
180 }
181
182 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
183
184
185 /*
186   remember the trouble maker
187  */
188 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
191         struct ctdb_banning_state *ban_state;
192
193         if (culprit > ctdb->num_nodes) {
194                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
195                 return;
196         }
197
198         /* If we are banned or stopped, do not set other nodes as culprits */
199         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
200                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
201                 return;
202         }
203
204         if (ctdb->nodes[culprit]->ban_state == NULL) {
205                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
206                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
207
208                 
209         }
210         ban_state = ctdb->nodes[culprit]->ban_state;
211         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
212                 /* this was the first time in a long while this node
213                    misbehaved so we will forgive any old transgressions.
214                 */
215                 ban_state->count = 0;
216         }
217
218         ban_state->count += count;
219         ban_state->last_reported_time = timeval_current();
220         rec->last_culprit_node = culprit;
221 }
222
223 /*
224   remember the trouble maker
225  */
226 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
227 {
228         ctdb_set_culprit_count(rec, culprit, 1);
229 }
230
231
232 /* this callback is called for every node that failed to execute the
233    recovered event
234 */
235 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
236 {
237         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
238
239         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
240
241         ctdb_set_culprit(rec, node_pnn);
242 }
243
244 /*
245   run the "recovered" eventscript on all nodes
246  */
247 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
248 {
249         TALLOC_CTX *tmp_ctx;
250         uint32_t *nodes;
251         struct ctdb_context *ctdb = rec->ctdb;
252
253         tmp_ctx = talloc_new(ctdb);
254         CTDB_NO_MEMORY(ctdb, tmp_ctx);
255
256         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
257         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
258                                         nodes, 0,
259                                         CONTROL_TIMEOUT(), false, tdb_null,
260                                         NULL, recovered_fail_callback,
261                                         rec) != 0) {
262                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
263
264                 talloc_free(tmp_ctx);
265                 return -1;
266         }
267
268         talloc_free(tmp_ctx);
269         return 0;
270 }
271
272 /* this callback is called for every node that failed to execute the
273    start recovery event
274 */
275 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
276 {
277         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
278
279         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
280
281         ctdb_set_culprit(rec, node_pnn);
282 }
283
284 /*
285   run the "startrecovery" eventscript on all nodes
286  */
287 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
288 {
289         TALLOC_CTX *tmp_ctx;
290         uint32_t *nodes;
291         struct ctdb_context *ctdb = rec->ctdb;
292
293         tmp_ctx = talloc_new(ctdb);
294         CTDB_NO_MEMORY(ctdb, tmp_ctx);
295
296         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
298                                         nodes, 0,
299                                         CONTROL_TIMEOUT(), false, tdb_null,
300                                         NULL,
301                                         startrecovery_fail_callback,
302                                         rec) != 0) {
303                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
304                 talloc_free(tmp_ctx);
305                 return -1;
306         }
307
308         talloc_free(tmp_ctx);
309         return 0;
310 }
311
312 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
313 {
314         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
315                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
316                 return;
317         }
318         if (node_pnn < ctdb->num_nodes) {
319                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
320         }
321
322         if (node_pnn == ctdb->pnn) {
323                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
324         }
325 }
326
327 /*
328   update the node capabilities for all connected nodes
329  */
330 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
331 {
332         uint32_t *nodes;
333         TALLOC_CTX *tmp_ctx;
334
335         tmp_ctx = talloc_new(ctdb);
336         CTDB_NO_MEMORY(ctdb, tmp_ctx);
337
338         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
340                                         nodes, 0,
341                                         CONTROL_TIMEOUT(),
342                                         false, tdb_null,
343                                         async_getcap_callback, NULL,
344                                         NULL) != 0) {
345                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
346                 talloc_free(tmp_ctx);
347                 return -1;
348         }
349
350         talloc_free(tmp_ctx);
351         return 0;
352 }
353
354 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
355 {
356         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
357
358         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
359         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
360 }
361
362 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
363 {
364         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
365
366         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
367         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
368 }
369
370 /*
371   change recovery mode on all nodes
372  */
373 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
374 {
375         TDB_DATA data;
376         uint32_t *nodes;
377         TALLOC_CTX *tmp_ctx;
378
379         tmp_ctx = talloc_new(ctdb);
380         CTDB_NO_MEMORY(ctdb, tmp_ctx);
381
382         /* freeze all nodes */
383         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
384         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
385                 int i;
386
387                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
388                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
389                                                 nodes, i,
390                                                 CONTROL_TIMEOUT(),
391                                                 false, tdb_null,
392                                                 NULL,
393                                                 set_recmode_fail_callback,
394                                                 rec) != 0) {
395                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
396                                 talloc_free(tmp_ctx);
397                                 return -1;
398                         }
399                 }
400         }
401
402
403         data.dsize = sizeof(uint32_t);
404         data.dptr = (unsigned char *)&rec_mode;
405
406         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
407                                         nodes, 0,
408                                         CONTROL_TIMEOUT(),
409                                         false, data,
410                                         NULL, NULL,
411                                         NULL) != 0) {
412                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
413                 talloc_free(tmp_ctx);
414                 return -1;
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   change recovery master on all node
423  */
424 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
425 {
426         TDB_DATA data;
427         TALLOC_CTX *tmp_ctx;
428         uint32_t *nodes;
429
430         tmp_ctx = talloc_new(ctdb);
431         CTDB_NO_MEMORY(ctdb, tmp_ctx);
432
433         data.dsize = sizeof(uint32_t);
434         data.dptr = (unsigned char *)&pnn;
435
436         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
437         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
438                                         nodes, 0,
439                                         CONTROL_TIMEOUT(), false, data,
440                                         NULL, NULL,
441                                         NULL) != 0) {
442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
443                 talloc_free(tmp_ctx);
444                 return -1;
445         }
446
447         talloc_free(tmp_ctx);
448         return 0;
449 }
450
451 /* update all remote nodes to use the same db priority that we have
452    this can fail if the remove node has not yet been upgraded to 
453    support this function, so we always return success and never fail
454    a recovery if this call fails.
455 */
456 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
457         struct ctdb_node_map *nodemap, 
458         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int db;
461         uint32_t *nodes;
462
463         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
464
465         /* step through all local databases */
466         for (db=0; db<dbmap->num;db++) {
467                 TDB_DATA data;
468                 struct ctdb_db_priority db_prio;
469                 int ret;
470
471                 db_prio.db_id     = dbmap->dbs[db].dbid;
472                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
473                 if (ret != 0) {
474                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
475                         continue;
476                 }
477
478                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
479
480                 data.dptr  = (uint8_t *)&db_prio;
481                 data.dsize = sizeof(db_prio);
482
483                 if (ctdb_client_async_control(ctdb,
484                                         CTDB_CONTROL_SET_DB_PRIORITY,
485                                         nodes, 0,
486                                         CONTROL_TIMEOUT(), false, data,
487                                         NULL, NULL,
488                                         NULL) != 0) {
489                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
490                 }
491         }
492
493         return 0;
494 }                       
495
496 /*
497   ensure all other nodes have attached to any databases that we have
498  */
499 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
500                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
501 {
502         int i, j, db, ret;
503         struct ctdb_dbid_map *remote_dbmap;
504
505         /* verify that all other nodes have all our databases */
506         for (j=0; j<nodemap->num; j++) {
507                 /* we dont need to ourself ourselves */
508                 if (nodemap->nodes[j].pnn == pnn) {
509                         continue;
510                 }
511                 /* dont check nodes that are unavailable */
512                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
513                         continue;
514                 }
515
516                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
517                                          mem_ctx, &remote_dbmap);
518                 if (ret != 0) {
519                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
520                         return -1;
521                 }
522
523                 /* step through all local databases */
524                 for (db=0; db<dbmap->num;db++) {
525                         const char *name;
526
527
528                         for (i=0;i<remote_dbmap->num;i++) {
529                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
530                                         break;
531                                 }
532                         }
533                         /* the remote node already have this database */
534                         if (i!=remote_dbmap->num) {
535                                 continue;
536                         }
537                         /* ok so we need to create this database */
538                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
539                                             mem_ctx, &name);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
542                                 return -1;
543                         }
544                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
545                                            mem_ctx, name,
546                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
547                         if (ret != 0) {
548                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
549                                 return -1;
550                         }
551                 }
552         }
553
554         return 0;
555 }
556
557
558 /*
559   ensure we are attached to any databases that anyone else is attached to
560  */
561 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
562                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
563 {
564         int i, j, db, ret;
565         struct ctdb_dbid_map *remote_dbmap;
566
567         /* verify that we have all database any other node has */
568         for (j=0; j<nodemap->num; j++) {
569                 /* we dont need to ourself ourselves */
570                 if (nodemap->nodes[j].pnn == pnn) {
571                         continue;
572                 }
573                 /* dont check nodes that are unavailable */
574                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
575                         continue;
576                 }
577
578                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
579                                          mem_ctx, &remote_dbmap);
580                 if (ret != 0) {
581                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
582                         return -1;
583                 }
584
585                 /* step through all databases on the remote node */
586                 for (db=0; db<remote_dbmap->num;db++) {
587                         const char *name;
588
589                         for (i=0;i<(*dbmap)->num;i++) {
590                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
591                                         break;
592                                 }
593                         }
594                         /* we already have this db locally */
595                         if (i!=(*dbmap)->num) {
596                                 continue;
597                         }
598                         /* ok so we need to create this database and
599                            rebuild dbmap
600                          */
601                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
602                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
603                         if (ret != 0) {
604                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
605                                           nodemap->nodes[j].pnn));
606                                 return -1;
607                         }
608                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
609                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
610                         if (ret != 0) {
611                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
612                                 return -1;
613                         }
614                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
615                         if (ret != 0) {
616                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
617                                 return -1;
618                         }
619                 }
620         }
621
622         return 0;
623 }
624
625
626 /*
627   pull the remote database contents from one node into the recdb
628  */
629 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
630                                     struct tdb_wrap *recdb, uint32_t dbid)
631 {
632         int ret;
633         TDB_DATA outdata;
634         struct ctdb_marshall_buffer *reply;
635         struct ctdb_rec_data *rec;
636         int i;
637         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
638
639         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
640                                CONTROL_TIMEOUT(), &outdata);
641         if (ret != 0) {
642                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
643                 talloc_free(tmp_ctx);
644                 return -1;
645         }
646
647         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
648
649         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
650                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
651                 talloc_free(tmp_ctx);
652                 return -1;
653         }
654         
655         rec = (struct ctdb_rec_data *)&reply->data[0];
656         
657         for (i=0;
658              i<reply->count;
659              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
660                 TDB_DATA key, data;
661                 struct ctdb_ltdb_header *hdr;
662                 TDB_DATA existing;
663                 
664                 key.dptr = &rec->data[0];
665                 key.dsize = rec->keylen;
666                 data.dptr = &rec->data[key.dsize];
667                 data.dsize = rec->datalen;
668                 
669                 hdr = (struct ctdb_ltdb_header *)data.dptr;
670
671                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
672                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
673                         talloc_free(tmp_ctx);
674                         return -1;
675                 }
676
677                 /* fetch the existing record, if any */
678                 existing = tdb_fetch(recdb->tdb, key);
679                 
680                 if (existing.dptr != NULL) {
681                         struct ctdb_ltdb_header header;
682                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
683                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
684                                          (unsigned)existing.dsize, srcnode));
685                                 free(existing.dptr);
686                                 talloc_free(tmp_ctx);
687                                 return -1;
688                         }
689                         header = *(struct ctdb_ltdb_header *)existing.dptr;
690                         free(existing.dptr);
691                         if (!(header.rsn < hdr->rsn ||
692                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
693                                 continue;
694                         }
695                 }
696                 
697                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
698                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
699                         talloc_free(tmp_ctx);
700                         return -1;                              
701                 }
702         }
703
704         talloc_free(tmp_ctx);
705
706         return 0;
707 }
708
709
710 struct pull_seqnum_cbdata {
711         int failed;
712         uint32_t pnn;
713         uint64_t seqnum;
714 };
715
716 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
717 {
718         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
719         uint64_t seqnum;
720
721         if (cb_data->failed != 0) {
722                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
723                 return;
724         }
725
726         if (res != 0) {
727                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
728                 cb_data->failed = 1;
729                 return;
730         }
731
732         if (outdata.dsize != sizeof(uint64_t)) {
733                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
734                 cb_data->failed = -1;
735                 return;
736         }
737
738         seqnum = *((uint64_t *)outdata.dptr);
739
740         if (seqnum > cb_data->seqnum) {
741                 cb_data->seqnum = seqnum;
742                 cb_data->pnn = node_pnn;
743         }
744 }
745
746 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
747 {
748         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
749
750         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
751         cb_data->failed = 1;
752 }
753
754 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
755                                 struct ctdb_recoverd *rec, 
756                                 struct ctdb_node_map *nodemap, 
757                                 struct tdb_wrap *recdb, uint32_t dbid)
758 {
759         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
760         uint32_t *nodes;
761         TDB_DATA data;
762         uint32_t outdata[2];
763         struct pull_seqnum_cbdata *cb_data;
764
765         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
766
767         outdata[0] = dbid;
768         outdata[1] = 0;
769
770         data.dsize = sizeof(outdata);
771         data.dptr  = (uint8_t *)&outdata[0];
772
773         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
774         if (cb_data == NULL) {
775                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
776                 talloc_free(tmp_ctx);
777                 return -1;
778         }
779
780         cb_data->failed = 0;
781         cb_data->pnn    = -1;
782         cb_data->seqnum = 0;
783         
784         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
785         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
786                                         nodes, 0,
787                                         CONTROL_TIMEOUT(), false, data,
788                                         pull_seqnum_cb,
789                                         pull_seqnum_fail_cb,
790                                         cb_data) != 0) {
791                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
792
793                 talloc_free(tmp_ctx);
794                 return -1;
795         }
796
797         if (cb_data->failed != 0) {
798                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
799                 talloc_free(tmp_ctx);
800                 return -1;
801         }
802
803         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
804                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
805                 talloc_free(tmp_ctx);
806                 return -1;
807         }
808
809         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
810
811         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
812                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
813                 talloc_free(tmp_ctx);
814                 return -1;
815         }
816
817         talloc_free(tmp_ctx);
818         return 0;
819 }
820
821
822 /*
823   pull all the remote database contents into the recdb
824  */
825 static int pull_remote_database(struct ctdb_context *ctdb,
826                                 struct ctdb_recoverd *rec, 
827                                 struct ctdb_node_map *nodemap, 
828                                 struct tdb_wrap *recdb, uint32_t dbid,
829                                 bool persistent)
830 {
831         int j;
832
833         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
834                 int ret;
835                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
836                 if (ret == 0) {
837                         return 0;
838                 }
839         }
840
841         /* pull all records from all other nodes across onto this node
842            (this merges based on rsn)
843         */
844         for (j=0; j<nodemap->num; j++) {
845                 /* dont merge from nodes that are unavailable */
846                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
847                         continue;
848                 }
849                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
850                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
851                                  nodemap->nodes[j].pnn));
852                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
853                         return -1;
854                 }
855         }
856         
857         return 0;
858 }
859
860
861 /*
862   update flags on all active nodes
863  */
864 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
865 {
866         int ret;
867
868         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
869                 if (ret != 0) {
870                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
871                 return -1;
872         }
873
874         return 0;
875 }
876
877 /*
878   ensure all nodes have the same vnnmap we do
879  */
880 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
881                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
882 {
883         int j, ret;
884
885         /* push the new vnn map out to all the nodes */
886         for (j=0; j<nodemap->num; j++) {
887                 /* dont push to nodes that are unavailable */
888                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
889                         continue;
890                 }
891
892                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
893                 if (ret != 0) {
894                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
895                         return -1;
896                 }
897         }
898
899         return 0;
900 }
901
902
903 struct vacuum_info {
904         struct vacuum_info *next, *prev;
905         struct ctdb_recoverd *rec;
906         uint32_t srcnode;
907         struct ctdb_db_context *ctdb_db;
908         struct ctdb_marshall_buffer *recs;
909         struct ctdb_rec_data *r;
910 };
911
912 static void vacuum_fetch_next(struct vacuum_info *v);
913
914 /*
915   called when a vacuum fetch has completed - just free it and do the next one
916  */
917 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
918 {
919         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
920         talloc_free(state);
921         vacuum_fetch_next(v);
922 }
923
924
925 /*
926   process the next element from the vacuum list
927 */
928 static void vacuum_fetch_next(struct vacuum_info *v)
929 {
930         struct ctdb_call call;
931         struct ctdb_rec_data *r;
932
933         while (v->recs->count) {
934                 struct ctdb_client_call_state *state;
935                 TDB_DATA data;
936                 struct ctdb_ltdb_header *hdr;
937
938                 ZERO_STRUCT(call);
939                 call.call_id = CTDB_NULL_FUNC;
940                 call.flags = CTDB_IMMEDIATE_MIGRATION;
941                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
942
943                 r = v->r;
944                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
945                 v->recs->count--;
946
947                 call.key.dptr = &r->data[0];
948                 call.key.dsize = r->keylen;
949
950                 /* ensure we don't block this daemon - just skip a record if we can't get
951                    the chainlock */
952                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
953                         continue;
954                 }
955
956                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
957                 if (data.dptr == NULL) {
958                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
959                         continue;
960                 }
961
962                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
963                         free(data.dptr);
964                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
965                         continue;
966                 }
967                 
968                 hdr = (struct ctdb_ltdb_header *)data.dptr;
969                 if (hdr->dmaster == v->rec->ctdb->pnn) {
970                         /* its already local */
971                         free(data.dptr);
972                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
973                         continue;
974                 }
975
976                 free(data.dptr);
977
978                 state = ctdb_call_send(v->ctdb_db, &call);
979                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
980                 if (state == NULL) {
981                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
982                         talloc_free(v);
983                         return;
984                 }
985                 state->async.fn = vacuum_fetch_callback;
986                 state->async.private_data = v;
987                 return;
988         }
989
990         talloc_free(v);
991 }
992
993
994 /*
995   destroy a vacuum info structure
996  */
997 static int vacuum_info_destructor(struct vacuum_info *v)
998 {
999         DLIST_REMOVE(v->rec->vacuum_info, v);
1000         return 0;
1001 }
1002
1003
1004 /*
1005   handler for vacuum fetch
1006 */
1007 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1008                                  TDB_DATA data, void *private_data)
1009 {
1010         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1011         struct ctdb_marshall_buffer *recs;
1012         int ret, i;
1013         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1014         const char *name;
1015         struct ctdb_dbid_map *dbmap=NULL;
1016         bool persistent = false;
1017         struct ctdb_db_context *ctdb_db;
1018         struct ctdb_rec_data *r;
1019         uint32_t srcnode;
1020         struct vacuum_info *v;
1021
1022         recs = (struct ctdb_marshall_buffer *)data.dptr;
1023         r = (struct ctdb_rec_data *)&recs->data[0];
1024
1025         if (recs->count == 0) {
1026                 talloc_free(tmp_ctx);
1027                 return;
1028         }
1029
1030         srcnode = r->reqid;
1031
1032         for (v=rec->vacuum_info;v;v=v->next) {
1033                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1034                         /* we're already working on records from this node */
1035                         talloc_free(tmp_ctx);
1036                         return;
1037                 }
1038         }
1039
1040         /* work out if the database is persistent */
1041         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1042         if (ret != 0) {
1043                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1044                 talloc_free(tmp_ctx);
1045                 return;
1046         }
1047
1048         for (i=0;i<dbmap->num;i++) {
1049                 if (dbmap->dbs[i].dbid == recs->db_id) {
1050                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1051                         break;
1052                 }
1053         }
1054         if (i == dbmap->num) {
1055                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1056                 talloc_free(tmp_ctx);
1057                 return;         
1058         }
1059
1060         /* find the name of this database */
1061         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1062                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1063                 talloc_free(tmp_ctx);
1064                 return;
1065         }
1066
1067         /* attach to it */
1068         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1069         if (ctdb_db == NULL) {
1070                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1071                 talloc_free(tmp_ctx);
1072                 return;
1073         }
1074
1075         v = talloc_zero(rec, struct vacuum_info);
1076         if (v == NULL) {
1077                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1078                 talloc_free(tmp_ctx);
1079                 return;
1080         }
1081
1082         v->rec = rec;
1083         v->srcnode = srcnode;
1084         v->ctdb_db = ctdb_db;
1085         v->recs = talloc_memdup(v, recs, data.dsize);
1086         if (v->recs == NULL) {
1087                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1088                 talloc_free(v);
1089                 talloc_free(tmp_ctx);
1090                 return;         
1091         }
1092         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1093
1094         DLIST_ADD(rec->vacuum_info, v);
1095
1096         talloc_set_destructor(v, vacuum_info_destructor);
1097
1098         vacuum_fetch_next(v);
1099         talloc_free(tmp_ctx);
1100 }
1101
1102
1103 /*
1104   called when ctdb_wait_timeout should finish
1105  */
1106 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1107                               struct timeval yt, void *p)
1108 {
1109         uint32_t *timed_out = (uint32_t *)p;
1110         (*timed_out) = 1;
1111 }
1112
1113 /*
1114   wait for a given number of seconds
1115  */
1116 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1117 {
1118         uint32_t timed_out = 0;
1119         time_t usecs = (secs - (time_t)secs) * 1000000;
1120         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1121         while (!timed_out) {
1122                 event_loop_once(ctdb->ev);
1123         }
1124 }
1125
1126 /*
1127   called when an election times out (ends)
1128  */
1129 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1130                                   struct timeval t, void *p)
1131 {
1132         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1133         rec->election_timeout = NULL;
1134         fast_start = false;
1135
1136         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1137 }
1138
1139
1140 /*
1141   wait for an election to finish. It finished election_timeout seconds after
1142   the last election packet is received
1143  */
1144 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1145 {
1146         struct ctdb_context *ctdb = rec->ctdb;
1147         while (rec->election_timeout) {
1148                 event_loop_once(ctdb->ev);
1149         }
1150 }
1151
1152 /*
1153   Update our local flags from all remote connected nodes. 
1154   This is only run when we are or we belive we are the recovery master
1155  */
1156 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1157 {
1158         int j;
1159         struct ctdb_context *ctdb = rec->ctdb;
1160         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1161
1162         /* get the nodemap for all active remote nodes and verify
1163            they are the same as for this node
1164          */
1165         for (j=0; j<nodemap->num; j++) {
1166                 struct ctdb_node_map *remote_nodemap=NULL;
1167                 int ret;
1168
1169                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1170                         continue;
1171                 }
1172                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1173                         continue;
1174                 }
1175
1176                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1177                                            mem_ctx, &remote_nodemap);
1178                 if (ret != 0) {
1179                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1180                                   nodemap->nodes[j].pnn));
1181                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1182                         talloc_free(mem_ctx);
1183                         return MONITOR_FAILED;
1184                 }
1185                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1186                         /* We should tell our daemon about this so it
1187                            updates its flags or else we will log the same 
1188                            message again in the next iteration of recovery.
1189                            Since we are the recovery master we can just as
1190                            well update the flags on all nodes.
1191                         */
1192                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1193                         if (ret != 0) {
1194                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1195                                 return -1;
1196                         }
1197
1198                         /* Update our local copy of the flags in the recovery
1199                            daemon.
1200                         */
1201                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1202                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1203                                  nodemap->nodes[j].flags));
1204                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1205                 }
1206                 talloc_free(remote_nodemap);
1207         }
1208         talloc_free(mem_ctx);
1209         return MONITOR_OK;
1210 }
1211
1212
1213 /* Create a new random generation ip. 
1214    The generation id can not be the INVALID_GENERATION id
1215 */
1216 static uint32_t new_generation(void)
1217 {
1218         uint32_t generation;
1219
1220         while (1) {
1221                 generation = random();
1222
1223                 if (generation != INVALID_GENERATION) {
1224                         break;
1225                 }
1226         }
1227
1228         return generation;
1229 }
1230
1231
1232 /*
1233   create a temporary working database
1234  */
1235 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1236 {
1237         char *name;
1238         struct tdb_wrap *recdb;
1239         unsigned tdb_flags;
1240
1241         /* open up the temporary recovery database */
1242         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1243                                ctdb->db_directory_state,
1244                                ctdb->pnn);
1245         if (name == NULL) {
1246                 return NULL;
1247         }
1248         unlink(name);
1249
1250         tdb_flags = TDB_NOLOCK;
1251         if (ctdb->valgrinding) {
1252                 tdb_flags |= TDB_NOMMAP;
1253         }
1254         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1255
1256         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1257                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1258         if (recdb == NULL) {
1259                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1260         }
1261
1262         talloc_free(name);
1263
1264         return recdb;
1265 }
1266
1267
1268 /* 
1269    a traverse function for pulling all relevant records from recdb
1270  */
1271 struct recdb_data {
1272         struct ctdb_context *ctdb;
1273         struct ctdb_marshall_buffer *recdata;
1274         uint32_t len;
1275         uint32_t allocated_len;
1276         bool failed;
1277         bool persistent;
1278 };
1279
1280 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1281 {
1282         struct recdb_data *params = (struct recdb_data *)p;
1283         struct ctdb_rec_data *rec;
1284         struct ctdb_ltdb_header *hdr;
1285
1286         /*
1287          * skip empty records - but NOT for persistent databases:
1288          *
1289          * The record-by-record mode of recovery deletes empty records.
1290          * For persistent databases, this can lead to data corruption
1291          * by deleting records that should be there:
1292          *
1293          * - Assume the cluster has been running for a while.
1294          *
1295          * - A record R in a persistent database has been created and
1296          *   deleted a couple of times, the last operation being deletion,
1297          *   leaving an empty record with a high RSN, say 10.
1298          *
1299          * - Now a node N is turned off.
1300          *
1301          * - This leaves the local database copy of D on N with the empty
1302          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1303          *   the copy of record R.
1304          *
1305          * - Now the record is created again while node N is turned off.
1306          *   This creates R with RSN = 1 on all nodes except for N.
1307          *
1308          * - Now node N is turned on again. The following recovery will chose
1309          *   the older empty copy of R due to RSN 10 > RSN 1.
1310          *
1311          * ==> Hence the record is gone after the recovery.
1312          *
1313          * On databases like Samba's registry, this can damage the higher-level
1314          * data structures built from the various tdb-level records.
1315          */
1316         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1317                 return 0;
1318         }
1319
1320         /* update the dmaster field to point to us */
1321         hdr = (struct ctdb_ltdb_header *)data.dptr;
1322         if (!params->persistent) {
1323                 hdr->dmaster = params->ctdb->pnn;
1324                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1325         }
1326
1327         /* add the record to the blob ready to send to the nodes */
1328         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1329         if (rec == NULL) {
1330                 params->failed = true;
1331                 return -1;
1332         }
1333         if (params->len + rec->length >= params->allocated_len) {
1334                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1335                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1336         }
1337         if (params->recdata == NULL) {
1338                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1339                          rec->length + params->len));
1340                 params->failed = true;
1341                 return -1;
1342         }
1343         params->recdata->count++;
1344         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1345         params->len += rec->length;
1346         talloc_free(rec);
1347
1348         return 0;
1349 }
1350
1351 /*
1352   push the recdb database out to all nodes
1353  */
1354 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1355                                bool persistent,
1356                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1357 {
1358         struct recdb_data params;
1359         struct ctdb_marshall_buffer *recdata;
1360         TDB_DATA outdata;
1361         TALLOC_CTX *tmp_ctx;
1362         uint32_t *nodes;
1363
1364         tmp_ctx = talloc_new(ctdb);
1365         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1366
1367         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1368         CTDB_NO_MEMORY(ctdb, recdata);
1369
1370         recdata->db_id = dbid;
1371
1372         params.ctdb = ctdb;
1373         params.recdata = recdata;
1374         params.len = offsetof(struct ctdb_marshall_buffer, data);
1375         params.allocated_len = params.len;
1376         params.failed = false;
1377         params.persistent = persistent;
1378
1379         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1380                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1381                 talloc_free(params.recdata);
1382                 talloc_free(tmp_ctx);
1383                 return -1;
1384         }
1385
1386         if (params.failed) {
1387                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1388                 talloc_free(params.recdata);
1389                 talloc_free(tmp_ctx);
1390                 return -1;              
1391         }
1392
1393         recdata = params.recdata;
1394
1395         outdata.dptr = (void *)recdata;
1396         outdata.dsize = params.len;
1397
1398         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1399         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1400                                         nodes, 0,
1401                                         CONTROL_TIMEOUT(), false, outdata,
1402                                         NULL, NULL,
1403                                         NULL) != 0) {
1404                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1405                 talloc_free(recdata);
1406                 talloc_free(tmp_ctx);
1407                 return -1;
1408         }
1409
1410         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1411                   dbid, recdata->count));
1412
1413         talloc_free(recdata);
1414         talloc_free(tmp_ctx);
1415
1416         return 0;
1417 }
1418
1419
1420 /*
1421   go through a full recovery on one database 
1422  */
1423 static int recover_database(struct ctdb_recoverd *rec, 
1424                             TALLOC_CTX *mem_ctx,
1425                             uint32_t dbid,
1426                             bool persistent,
1427                             uint32_t pnn, 
1428                             struct ctdb_node_map *nodemap,
1429                             uint32_t transaction_id)
1430 {
1431         struct tdb_wrap *recdb;
1432         int ret;
1433         struct ctdb_context *ctdb = rec->ctdb;
1434         TDB_DATA data;
1435         struct ctdb_control_wipe_database w;
1436         uint32_t *nodes;
1437
1438         recdb = create_recdb(ctdb, mem_ctx);
1439         if (recdb == NULL) {
1440                 return -1;
1441         }
1442
1443         /* pull all remote databases onto the recdb */
1444         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1445         if (ret != 0) {
1446                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1447                 return -1;
1448         }
1449
1450         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1451
1452         /* wipe all the remote databases. This is safe as we are in a transaction */
1453         w.db_id = dbid;
1454         w.transaction_id = transaction_id;
1455
1456         data.dptr = (void *)&w;
1457         data.dsize = sizeof(w);
1458
1459         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1460         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1461                                         nodes, 0,
1462                                         CONTROL_TIMEOUT(), false, data,
1463                                         NULL, NULL,
1464                                         NULL) != 0) {
1465                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1466                 talloc_free(recdb);
1467                 return -1;
1468         }
1469         
1470         /* push out the correct database. This sets the dmaster and skips 
1471            the empty records */
1472         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1473         if (ret != 0) {
1474                 talloc_free(recdb);
1475                 return -1;
1476         }
1477
1478         /* all done with this database */
1479         talloc_free(recdb);
1480
1481         return 0;
1482 }
1483
1484 /*
1485   reload the nodes file 
1486 */
1487 static void reload_nodes_file(struct ctdb_context *ctdb)
1488 {
1489
1490         ctdb_load_nodes_file(ctdb);
1491 }
1492
1493 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1494                                          struct ctdb_recoverd *rec,
1495                                          struct ctdb_node_map *nodemap,
1496                                          uint32_t *culprit)
1497 {
1498         int j;
1499         int ret;
1500
1501         if (ctdb->num_nodes != nodemap->num) {
1502                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1503                                   ctdb->num_nodes, nodemap->num));
1504                 if (culprit) {
1505                         *culprit = ctdb->pnn;
1506                 }
1507                 return -1;
1508         }
1509
1510         for (j=0; j<nodemap->num; j++) {
1511                 /* For readability */
1512                 struct ctdb_node *node = ctdb->nodes[j];
1513
1514                 /* release any existing data */
1515                 if (node->known_public_ips) {
1516                         talloc_free(node->known_public_ips);
1517                         node->known_public_ips = NULL;
1518                 }
1519                 if (node->available_public_ips) {
1520                         talloc_free(node->available_public_ips);
1521                         node->available_public_ips = NULL;
1522                 }
1523
1524                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1525                         continue;
1526                 }
1527
1528                 /* Retrieve the list of known public IPs from the node */
1529                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1530                                         CONTROL_TIMEOUT(),
1531                                         node->pnn,
1532                                         ctdb->nodes,
1533                                         0,
1534                                         &node->known_public_ips);
1535                 if (ret != 0) {
1536                         DEBUG(DEBUG_ERR,
1537                               ("Failed to read known public IPs from node: %u\n",
1538                                node->pnn));
1539                         if (culprit) {
1540                                 *culprit = node->pnn;
1541                         }
1542                         return -1;
1543                 }
1544
1545                 if (ctdb->do_checkpublicip &&
1546                     rec->takeover_runs_disable_ctx == NULL &&
1547                     verify_remote_ip_allocation(ctdb,
1548                                                  node->known_public_ips,
1549                                                  node->pnn)) {
1550                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1551                         rec->need_takeover_run = true;
1552                 }
1553
1554                 /* Retrieve the list of available public IPs from the node */
1555                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1556                                         CONTROL_TIMEOUT(),
1557                                         node->pnn,
1558                                         ctdb->nodes,
1559                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1560                                         &node->available_public_ips);
1561                 if (ret != 0) {
1562                         DEBUG(DEBUG_ERR,
1563                               ("Failed to read available public IPs from node: %u\n",
1564                                node->pnn));
1565                         if (culprit) {
1566                                 *culprit = node->pnn;
1567                         }
1568                         return -1;
1569                 }
1570         }
1571
1572         return 0;
1573 }
1574
1575 /* when we start a recovery, make sure all nodes use the same reclock file
1576    setting
1577 */
1578 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1579 {
1580         struct ctdb_context *ctdb = rec->ctdb;
1581         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1582         TDB_DATA data;
1583         uint32_t *nodes;
1584
1585         if (ctdb->recovery_lock_file == NULL) {
1586                 data.dptr  = NULL;
1587                 data.dsize = 0;
1588         } else {
1589                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1590                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1591         }
1592
1593         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1594         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1595                                         nodes, 0,
1596                                         CONTROL_TIMEOUT(),
1597                                         false, data,
1598                                         NULL, NULL,
1599                                         rec) != 0) {
1600                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1601                 talloc_free(tmp_ctx);
1602                 return -1;
1603         }
1604
1605         talloc_free(tmp_ctx);
1606         return 0;
1607 }
1608
1609
1610 /*
1611  * this callback is called for every node that failed to execute ctdb_takeover_run()
1612  * and set flag to re-run takeover run.
1613  */
1614 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1615 {
1616         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1617
1618         if (callback_data != NULL) {
1619                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1620
1621                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1622
1623                 ctdb_set_culprit(rec, node_pnn);
1624         }
1625 }
1626
1627
1628 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1629 {
1630         struct ctdb_context *ctdb = rec->ctdb;
1631         int i;
1632         struct ctdb_banning_state *ban_state;
1633
1634         *self_ban = false;
1635         for (i=0; i<ctdb->num_nodes; i++) {
1636                 if (ctdb->nodes[i]->ban_state == NULL) {
1637                         continue;
1638                 }
1639                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1640                 if (ban_state->count < 2*ctdb->num_nodes) {
1641                         continue;
1642                 }
1643
1644                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1645                         ctdb->nodes[i]->pnn, ban_state->count,
1646                         ctdb->tunable.recovery_ban_period));
1647                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1648                 ban_state->count = 0;
1649
1650                 /* Banning ourself? */
1651                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1652                         *self_ban = true;
1653                 }
1654         }
1655 }
1656
1657 static bool do_takeover_run(struct ctdb_recoverd *rec,
1658                             struct ctdb_node_map *nodemap,
1659                             bool banning_credits_on_fail)
1660 {
1661         uint32_t *nodes = NULL;
1662         struct srvid_request dtr;
1663         TDB_DATA data;
1664         int i;
1665         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1666         int ret;
1667         bool ok;
1668
1669         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1670
1671         if (rec->takeover_run_in_progress) {
1672                 DEBUG(DEBUG_ERR, (__location__
1673                                   " takeover run already in progress \n"));
1674                 ok = false;
1675                 goto done;
1676         }
1677
1678         rec->takeover_run_in_progress = true;
1679
1680         /* If takeover runs are in disabled then fail... */
1681         if (rec->takeover_runs_disable_ctx != NULL) {
1682                 DEBUG(DEBUG_ERR,
1683                       ("Takeover runs are disabled so refusing to run one\n"));
1684                 ok = false;
1685                 goto done;
1686         }
1687
1688         /* Disable IP checks (takeover runs, really) on other nodes
1689          * while doing this takeover run.  This will stop those other
1690          * nodes from triggering takeover runs when think they should
1691          * be hosting an IP but it isn't yet on an interface.  Don't
1692          * wait for replies since a failure here might cause some
1693          * noise in the logs but will not actually cause a problem.
1694          */
1695         dtr.srvid = 0; /* No reply */
1696         dtr.pnn = -1;
1697
1698         data.dptr  = (uint8_t*)&dtr;
1699         data.dsize = sizeof(dtr);
1700
1701         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1702
1703         /* Disable for 5 minutes.  This can be a tunable later if
1704          * necessary.
1705          */
1706         dtr.data = 300;
1707         for (i = 0; i < talloc_array_length(nodes); i++) {
1708                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1709                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1710                                              data) != 0) {
1711                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1712                 }
1713         }
1714
1715         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1716                                 rec->force_rebalance_nodes,
1717                                 takeover_fail_callback,
1718                                 banning_credits_on_fail ? rec : NULL);
1719
1720         /* Reenable takeover runs and IP checks on other nodes */
1721         dtr.data = 0;
1722         for (i = 0; i < talloc_array_length(nodes); i++) {
1723                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1724                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1725                                              data) != 0) {
1726                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1727                 }
1728         }
1729
1730         if (ret != 0) {
1731                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1732                 ok = false;
1733                 goto done;
1734         }
1735
1736         ok = true;
1737         /* Takeover run was successful so clear force rebalance targets */
1738         if (rebalance_nodes == rec->force_rebalance_nodes) {
1739                 TALLOC_FREE(rec->force_rebalance_nodes);
1740         } else {
1741                 DEBUG(DEBUG_WARNING,
1742                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1743         }
1744 done:
1745         rec->need_takeover_run = !ok;
1746         talloc_free(nodes);
1747         rec->takeover_run_in_progress = false;
1748
1749         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1750         return ok;
1751 }
1752
1753
1754 /*
1755   we are the recmaster, and recovery is needed - start a recovery run
1756  */
1757 static int do_recovery(struct ctdb_recoverd *rec, 
1758                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1759                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1760 {
1761         struct ctdb_context *ctdb = rec->ctdb;
1762         int i, j, ret;
1763         uint32_t generation;
1764         struct ctdb_dbid_map *dbmap;
1765         TDB_DATA data;
1766         uint32_t *nodes;
1767         struct timeval start_time;
1768         uint32_t culprit = (uint32_t)-1;
1769         bool self_ban;
1770
1771         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1772
1773         /* if recovery fails, force it again */
1774         rec->need_recovery = true;
1775
1776         ban_misbehaving_nodes(rec, &self_ban);
1777         if (self_ban) {
1778                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1779                 return -1;
1780         }
1781
1782         if (ctdb->tunable.verify_recovery_lock != 0) {
1783                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1784                 start_time = timeval_current();
1785                 if (!ctdb_recovery_lock(ctdb, true)) {
1786                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1787                                          "and ban ourself for %u seconds\n",
1788                                          ctdb->tunable.recovery_ban_period));
1789                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1790                         return -1;
1791                 }
1792                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1793                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1794         }
1795
1796         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1797
1798         /* get a list of all databases */
1799         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1800         if (ret != 0) {
1801                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1802                 return -1;
1803         }
1804
1805         /* we do the db creation before we set the recovery mode, so the freeze happens
1806            on all databases we will be dealing with. */
1807
1808         /* verify that we have all the databases any other node has */
1809         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1810         if (ret != 0) {
1811                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1812                 return -1;
1813         }
1814
1815         /* verify that all other nodes have all our databases */
1816         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1817         if (ret != 0) {
1818                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1819                 return -1;
1820         }
1821         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1822
1823         /* update the database priority for all remote databases */
1824         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1825         if (ret != 0) {
1826                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1827         }
1828         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1829
1830
1831         /* update all other nodes to use the same setting for reclock files
1832            as the local recovery master.
1833         */
1834         sync_recovery_lock_file_across_cluster(rec);
1835
1836         /* set recovery mode to active on all nodes */
1837         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1838         if (ret != 0) {
1839                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1840                 return -1;
1841         }
1842
1843         /* execute the "startrecovery" event script on all nodes */
1844         ret = run_startrecovery_eventscript(rec, nodemap);
1845         if (ret!=0) {
1846                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1847                 return -1;
1848         }
1849
1850         /*
1851           update all nodes to have the same flags that we have
1852          */
1853         for (i=0;i<nodemap->num;i++) {
1854                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1855                         continue;
1856                 }
1857
1858                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1859                 if (ret != 0) {
1860                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1861                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1862                         } else {
1863                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1864                                 return -1;
1865                         }
1866                 }
1867         }
1868
1869         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1870
1871         /* pick a new generation number */
1872         generation = new_generation();
1873
1874         /* change the vnnmap on this node to use the new generation 
1875            number but not on any other nodes.
1876            this guarantees that if we abort the recovery prematurely
1877            for some reason (a node stops responding?)
1878            that we can just return immediately and we will reenter
1879            recovery shortly again.
1880            I.e. we deliberately leave the cluster with an inconsistent
1881            generation id to allow us to abort recovery at any stage and
1882            just restart it from scratch.
1883          */
1884         vnnmap->generation = generation;
1885         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1886         if (ret != 0) {
1887                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1888                 return -1;
1889         }
1890
1891         data.dptr = (void *)&generation;
1892         data.dsize = sizeof(uint32_t);
1893
1894         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1895         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1896                                         nodes, 0,
1897                                         CONTROL_TIMEOUT(), false, data,
1898                                         NULL,
1899                                         transaction_start_fail_callback,
1900                                         rec) != 0) {
1901                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1902                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1903                                         nodes, 0,
1904                                         CONTROL_TIMEOUT(), false, tdb_null,
1905                                         NULL,
1906                                         NULL,
1907                                         NULL) != 0) {
1908                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1909                 }
1910                 return -1;
1911         }
1912
1913         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1914
1915         for (i=0;i<dbmap->num;i++) {
1916                 ret = recover_database(rec, mem_ctx,
1917                                        dbmap->dbs[i].dbid,
1918                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1919                                        pnn, nodemap, generation);
1920                 if (ret != 0) {
1921                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1922                         return -1;
1923                 }
1924         }
1925
1926         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1927
1928         /* commit all the changes */
1929         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1930                                         nodes, 0,
1931                                         CONTROL_TIMEOUT(), false, data,
1932                                         NULL, NULL,
1933                                         NULL) != 0) {
1934                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1935                 return -1;
1936         }
1937
1938         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1939         
1940
1941         /* update the capabilities for all nodes */
1942         ret = update_capabilities(ctdb, nodemap);
1943         if (ret!=0) {
1944                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1945                 return -1;
1946         }
1947
1948         /* build a new vnn map with all the currently active and
1949            unbanned nodes */
1950         generation = new_generation();
1951         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1952         CTDB_NO_MEMORY(ctdb, vnnmap);
1953         vnnmap->generation = generation;
1954         vnnmap->size = 0;
1955         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1956         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1957         for (i=j=0;i<nodemap->num;i++) {
1958                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1959                         continue;
1960                 }
1961                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1962                         /* this node can not be an lmaster */
1963                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1964                         continue;
1965                 }
1966
1967                 vnnmap->size++;
1968                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1969                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1970                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1971
1972         }
1973         if (vnnmap->size == 0) {
1974                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1975                 vnnmap->size++;
1976                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1977                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1978                 vnnmap->map[0] = pnn;
1979         }       
1980
1981         /* update to the new vnnmap on all nodes */
1982         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1983         if (ret != 0) {
1984                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1985                 return -1;
1986         }
1987
1988         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1989
1990         /* update recmaster to point to us for all nodes */
1991         ret = set_recovery_master(ctdb, nodemap, pnn);
1992         if (ret!=0) {
1993                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1994                 return -1;
1995         }
1996
1997         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1998
1999         /*
2000           update all nodes to have the same flags that we have
2001          */
2002         for (i=0;i<nodemap->num;i++) {
2003                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2004                         continue;
2005                 }
2006
2007                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2008                 if (ret != 0) {
2009                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2010                         return -1;
2011                 }
2012         }
2013
2014         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2015
2016         /* disable recovery mode */
2017         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
2018         if (ret != 0) {
2019                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2020                 return -1;
2021         }
2022
2023         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2024
2025         /* Fetch known/available public IPs from each active node */
2026         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2027         if (ret != 0) {
2028                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2029                                  culprit));
2030                 rec->need_takeover_run = true;
2031                 return -1;
2032         }
2033
2034         do_takeover_run(rec, nodemap, false);
2035
2036         /* execute the "recovered" event script on all nodes */
2037         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2038         if (ret!=0) {
2039                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2040                 return -1;
2041         }
2042
2043         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2044
2045         /* send a message to all clients telling them that the cluster 
2046            has been reconfigured */
2047         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
2048
2049         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2050
2051         rec->need_recovery = false;
2052
2053         /* we managed to complete a full recovery, make sure to forgive
2054            any past sins by the nodes that could now participate in the
2055            recovery.
2056         */
2057         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2058         for (i=0;i<nodemap->num;i++) {
2059                 struct ctdb_banning_state *ban_state;
2060
2061                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2062                         continue;
2063                 }
2064
2065                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2066                 if (ban_state == NULL) {
2067                         continue;
2068                 }
2069
2070                 ban_state->count = 0;
2071         }
2072
2073
2074         /* We just finished a recovery successfully. 
2075            We now wait for rerecovery_timeout before we allow 
2076            another recovery to take place.
2077         */
2078         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2079         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2080         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2081
2082         return 0;
2083 }
2084
2085
2086 /*
2087   elections are won by first checking the number of connected nodes, then
2088   the priority time, then the pnn
2089  */
2090 struct election_message {
2091         uint32_t num_connected;
2092         struct timeval priority_time;
2093         uint32_t pnn;
2094         uint32_t node_flags;
2095 };
2096
2097 /*
2098   form this nodes election data
2099  */
2100 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2101 {
2102         int ret, i;
2103         struct ctdb_node_map *nodemap;
2104         struct ctdb_context *ctdb = rec->ctdb;
2105
2106         ZERO_STRUCTP(em);
2107
2108         em->pnn = rec->ctdb->pnn;
2109         em->priority_time = rec->priority_time;
2110
2111         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2112         if (ret != 0) {
2113                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
2114                 return;
2115         }
2116
2117         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2118         em->node_flags = rec->node_flags;
2119
2120         for (i=0;i<nodemap->num;i++) {
2121                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2122                         em->num_connected++;
2123                 }
2124         }
2125
2126         /* we shouldnt try to win this election if we cant be a recmaster */
2127         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2128                 em->num_connected = 0;
2129                 em->priority_time = timeval_current();
2130         }
2131
2132         talloc_free(nodemap);
2133 }
2134
2135 /*
2136   see if the given election data wins
2137  */
2138 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2139 {
2140         struct election_message myem;
2141         int cmp = 0;
2142
2143         ctdb_election_data(rec, &myem);
2144
2145         /* we cant win if we dont have the recmaster capability */
2146         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2147                 return false;
2148         }
2149
2150         /* we cant win if we are banned */
2151         if (rec->node_flags & NODE_FLAGS_BANNED) {
2152                 return false;
2153         }
2154
2155         /* we cant win if we are stopped */
2156         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2157                 return false;
2158         }
2159
2160         /* we will automatically win if the other node is banned */
2161         if (em->node_flags & NODE_FLAGS_BANNED) {
2162                 return true;
2163         }
2164
2165         /* we will automatically win if the other node is banned */
2166         if (em->node_flags & NODE_FLAGS_STOPPED) {
2167                 return true;
2168         }
2169
2170         /* try to use the most connected node */
2171         if (cmp == 0) {
2172                 cmp = (int)myem.num_connected - (int)em->num_connected;
2173         }
2174
2175         /* then the longest running node */
2176         if (cmp == 0) {
2177                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2178         }
2179
2180         if (cmp == 0) {
2181                 cmp = (int)myem.pnn - (int)em->pnn;
2182         }
2183
2184         return cmp > 0;
2185 }
2186
2187 /*
2188   send out an election request
2189  */
2190 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2191 {
2192         int ret;
2193         TDB_DATA election_data;
2194         struct election_message emsg;
2195         uint64_t srvid;
2196         struct ctdb_context *ctdb = rec->ctdb;
2197
2198         srvid = CTDB_SRVID_RECOVERY;
2199
2200         ctdb_election_data(rec, &emsg);
2201
2202         election_data.dsize = sizeof(struct election_message);
2203         election_data.dptr  = (unsigned char *)&emsg;
2204
2205
2206         /* send an election message to all active nodes */
2207         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2208         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2209
2210
2211         /* A new node that is already frozen has entered the cluster.
2212            The existing nodes are not frozen and dont need to be frozen
2213            until the election has ended and we start the actual recovery
2214         */
2215         if (update_recmaster == true) {
2216                 /* first we assume we will win the election and set 
2217                    recoverymaster to be ourself on the current node
2218                  */
2219                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2220                 if (ret != 0) {
2221                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2222                         return -1;
2223                 }
2224         }
2225
2226
2227         return 0;
2228 }
2229
2230 /*
2231   this function will unban all nodes in the cluster
2232 */
2233 static void unban_all_nodes(struct ctdb_context *ctdb)
2234 {
2235         int ret, i;
2236         struct ctdb_node_map *nodemap;
2237         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2238         
2239         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2240         if (ret != 0) {
2241                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2242                 return;
2243         }
2244
2245         for (i=0;i<nodemap->num;i++) {
2246                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2247                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2248                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2249                 }
2250         }
2251
2252         talloc_free(tmp_ctx);
2253 }
2254
2255
2256 /*
2257   we think we are winning the election - send a broadcast election request
2258  */
2259 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2260 {
2261         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2262         int ret;
2263
2264         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2265         if (ret != 0) {
2266                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2267         }
2268
2269         talloc_free(rec->send_election_te);
2270         rec->send_election_te = NULL;
2271 }
2272
2273 /*
2274   handler for memory dumps
2275 */
2276 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2277                              TDB_DATA data, void *private_data)
2278 {
2279         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2280         TDB_DATA *dump;
2281         int ret;
2282         struct srvid_request *rd;
2283
2284         if (data.dsize != sizeof(struct srvid_request)) {
2285                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2286                 talloc_free(tmp_ctx);
2287                 return;
2288         }
2289         rd = (struct srvid_request *)data.dptr;
2290
2291         dump = talloc_zero(tmp_ctx, TDB_DATA);
2292         if (dump == NULL) {
2293                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2294                 talloc_free(tmp_ctx);
2295                 return;
2296         }
2297         ret = ctdb_dump_memory(ctdb, dump);
2298         if (ret != 0) {
2299                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2300                 talloc_free(tmp_ctx);
2301                 return;
2302         }
2303
2304 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2305
2306         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2307         if (ret != 0) {
2308                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2309                 talloc_free(tmp_ctx);
2310                 return;
2311         }
2312
2313         talloc_free(tmp_ctx);
2314 }
2315
2316 /*
2317   handler for getlog
2318 */
2319 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2320                            TDB_DATA data, void *private_data)
2321 {
2322         struct ctdb_get_log_addr *log_addr;
2323         pid_t child;
2324
2325         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2326                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2327                 return;
2328         }
2329         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2330
2331         child = ctdb_fork_no_free_ringbuffer(ctdb);
2332         if (child == (pid_t)-1) {
2333                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2334                 return;
2335         }
2336
2337         if (child == 0) {
2338                 ctdb_set_process_name("ctdb_rec_log_collector");
2339                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2340                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2341                         _exit(1);
2342                 }
2343                 ctdb_collect_log(ctdb, log_addr);
2344                 _exit(0);
2345         }
2346 }
2347
2348 /*
2349   handler for clearlog
2350 */
2351 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2352                              TDB_DATA data, void *private_data)
2353 {
2354         ctdb_clear_log(ctdb);
2355 }
2356
2357 /*
2358   handler for reload_nodes
2359 */
2360 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2361                              TDB_DATA data, void *private_data)
2362 {
2363         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2364
2365         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2366
2367         reload_nodes_file(rec->ctdb);
2368 }
2369
2370
2371 static void ctdb_rebalance_timeout(struct event_context *ev,
2372                                    struct timed_event *te,
2373                                    struct timeval t, void *p)
2374 {
2375         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2376
2377         if (rec->force_rebalance_nodes == NULL) {
2378                 DEBUG(DEBUG_ERR,
2379                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2380                 return;
2381         }
2382
2383         DEBUG(DEBUG_NOTICE,
2384               ("Rebalance timeout occurred - do takeover run\n"));
2385         do_takeover_run(rec, rec->nodemap, false);
2386 }
2387
2388         
2389 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2390                                         uint64_t srvid,
2391                                         TDB_DATA data, void *private_data)
2392 {
2393         uint32_t pnn;
2394         uint32_t *t;
2395         int len;
2396         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2397
2398         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2399                 return;
2400         }
2401
2402         if (data.dsize != sizeof(uint32_t)) {
2403                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2404                 return;
2405         }
2406
2407         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2408                 return;
2409         }
2410
2411         pnn = *(uint32_t *)&data.dptr[0];
2412
2413         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2414
2415         /* Copy any existing list of nodes.  There's probably some
2416          * sort of realloc variant that will do this but we need to
2417          * make sure that freeing the old array also cancels the timer
2418          * event for the timeout... not sure if realloc will do that.
2419          */
2420         len = (rec->force_rebalance_nodes != NULL) ?
2421                 talloc_array_length(rec->force_rebalance_nodes) :
2422                 0;
2423
2424         /* This allows duplicates to be added but they don't cause
2425          * harm.  A call to add a duplicate PNN arguably means that
2426          * the timeout should be reset, so this is the simplest
2427          * solution.
2428          */
2429         t = talloc_zero_array(rec, uint32_t, len+1);
2430         CTDB_NO_MEMORY_VOID(ctdb, t);
2431         if (len > 0) {
2432                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2433         }
2434         t[len] = pnn;
2435
2436         talloc_free(rec->force_rebalance_nodes);
2437
2438         rec->force_rebalance_nodes = t;
2439         event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2440                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2441                         ctdb_rebalance_timeout, rec);
2442 }
2443
2444
2445
2446 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2447                              TDB_DATA data, void *private_data)
2448 {
2449         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2450         struct ctdb_public_ip *ip;
2451
2452         if (rec->recmaster != rec->ctdb->pnn) {
2453                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2454                 return;
2455         }
2456
2457         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2458                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2459                 return;
2460         }
2461
2462         ip = (struct ctdb_public_ip *)data.dptr;
2463
2464         update_ip_assignment_tree(rec->ctdb, ip);
2465 }
2466
2467
2468 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2469 {
2470         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2471 }
2472
2473 static void reenable_takeover_runs(struct event_context *ev,
2474                                    struct timed_event *te,
2475                                    struct timeval yt, void *p)
2476 {
2477         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2478
2479         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2480         clear_takeover_runs_disable(rec);
2481 }
2482
2483 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2484                                           uint64_t srvid, TDB_DATA data,
2485                                           void *private_data)
2486 {
2487         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2488                                                     struct ctdb_recoverd);
2489         struct srvid_request *r;
2490         uint32_t timeout;
2491         TDB_DATA result;
2492         int32_t ret = 0;
2493
2494         /* Validate input data */
2495         if (data.dsize != sizeof(struct srvid_request)) {
2496                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2497                                  "expecting %lu\n", (long unsigned)data.dsize,
2498                                  (long unsigned)sizeof(struct srvid_request)));
2499                 ret = -EINVAL;
2500                 goto done;
2501         }
2502         if (data.dptr == NULL) {
2503                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2504                 ret = -EINVAL;
2505                 goto done;
2506         }
2507
2508         r = (struct srvid_request *)data.dptr;
2509         timeout = r->data;
2510
2511         if (timeout == 0) {
2512                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2513                 clear_takeover_runs_disable(rec);
2514                 ret = ctdb_get_pnn(ctdb);
2515                 goto done;
2516         }
2517
2518         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
2519                 DEBUG(DEBUG_ERR,
2520                       ("Refusing to disable takeover runs on inactive node\n"));
2521                 ret = -EHOSTDOWN;
2522                 goto done;
2523         }
2524
2525         if (rec->takeover_run_in_progress) {
2526                 DEBUG(DEBUG_ERR,
2527                       ("Unable to disable takeover runs - in progress\n"));
2528                 ret = -EAGAIN;
2529                 goto done;
2530         }
2531
2532         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2533
2534         /* Clear any old timers */
2535         clear_takeover_runs_disable(rec);
2536
2537         /* When this is non-NULL it indicates that takeover runs are
2538          * disabled.  This context also holds the timeout timer.
2539          */
2540         rec->takeover_runs_disable_ctx = talloc_new(rec);
2541         if (rec->takeover_runs_disable_ctx == NULL) {
2542                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2543                 ret = -ENOMEM;
2544                 goto done;
2545         }
2546
2547         /* Arrange for the timeout to occur */
2548         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2549                         timeval_current_ofs(timeout, 0),
2550                         reenable_takeover_runs,
2551                         rec);
2552
2553         /* Returning our PNN tells the caller that we succeeded */
2554         ret = ctdb_get_pnn(ctdb);
2555 done:
2556         result.dsize = sizeof(int32_t);
2557         result.dptr  = (uint8_t *)&ret;
2558         srvid_request_reply(ctdb, r, result);
2559 }
2560
2561 /* Backward compatibility for this SRVID - call
2562  * disable_takeover_runs_handler() instead
2563  */
2564 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2565                                      TDB_DATA data, void *private_data)
2566 {
2567         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2568                                                     struct ctdb_recoverd);
2569         TDB_DATA data2;
2570         struct srvid_request *req;
2571
2572         if (data.dsize != sizeof(uint32_t)) {
2573                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2574                                  "expecting %lu\n", (long unsigned)data.dsize,
2575                                  (long unsigned)sizeof(uint32_t)));
2576                 return;
2577         }
2578         if (data.dptr == NULL) {
2579                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2580                 return;
2581         }
2582
2583         req = talloc(ctdb, struct srvid_request);
2584         CTDB_NO_MEMORY_VOID(ctdb, req);
2585
2586         req->srvid = 0; /* No reply */
2587         req->pnn = -1;
2588         req->data = *((uint32_t *)data.dptr); /* Timeout */
2589
2590         data2.dsize = sizeof(*req);
2591         data2.dptr = (uint8_t *)req;
2592
2593         disable_takeover_runs_handler(rec->ctdb,
2594                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2595                                       data2, rec);
2596 }
2597
2598 /*
2599   handler for ip reallocate, just add it to the list of requests and 
2600   handle this later in the monitor_cluster loop so we do not recurse
2601   with other requests to takeover_run()
2602 */
2603 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2604                                   TDB_DATA data, void *private_data)
2605 {
2606         struct srvid_request *request;
2607         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2608                                                     struct ctdb_recoverd);
2609
2610         if (data.dsize != sizeof(struct srvid_request)) {
2611                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2612                 return;
2613         }
2614
2615         request = (struct srvid_request *)data.dptr;
2616
2617         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2618 }
2619
2620 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2621                                           struct ctdb_recoverd *rec)
2622 {
2623         TDB_DATA result;
2624         int32_t ret;
2625         uint32_t culprit;
2626
2627         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2628
2629         /* update the list of public ips that a node can handle for
2630            all connected nodes
2631         */
2632         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2633         if (ret != 0) {
2634                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2635                                  culprit));
2636                 rec->need_takeover_run = true;
2637         }
2638         if (ret == 0) {
2639                 if (do_takeover_run(rec, rec->nodemap, false)) {
2640                         ret = ctdb_get_pnn(ctdb);
2641                 } else {
2642                         ret = -1;
2643                 }
2644         }
2645
2646         result.dsize = sizeof(int32_t);
2647         result.dptr  = (uint8_t *)&ret;
2648
2649         srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
2650 }
2651
2652
2653 /*
2654   handler for recovery master elections
2655 */
2656 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2657                              TDB_DATA data, void *private_data)
2658 {
2659         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2660         int ret;
2661         struct election_message *em = (struct election_message *)data.dptr;
2662         TALLOC_CTX *mem_ctx;
2663
2664         /* we got an election packet - update the timeout for the election */
2665         talloc_free(rec->election_timeout);
2666         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2667                                                 fast_start ?
2668                                                 timeval_current_ofs(0, 500000) :
2669                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2670                                                 ctdb_election_timeout, rec);
2671
2672         mem_ctx = talloc_new(ctdb);
2673
2674         /* someone called an election. check their election data
2675            and if we disagree and we would rather be the elected node, 
2676            send a new election message to all other nodes
2677          */
2678         if (ctdb_election_win(rec, em)) {
2679                 if (!rec->send_election_te) {
2680                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2681                                                                 timeval_current_ofs(0, 500000),
2682                                                                 election_send_request, rec);
2683                 }
2684                 talloc_free(mem_ctx);
2685                 /*unban_all_nodes(ctdb);*/
2686                 return;
2687         }
2688         
2689         /* we didn't win */
2690         talloc_free(rec->send_election_te);
2691         rec->send_election_te = NULL;
2692
2693         if (ctdb->tunable.verify_recovery_lock != 0) {
2694                 /* release the recmaster lock */
2695                 if (em->pnn != ctdb->pnn &&
2696                     ctdb->recovery_lock_fd != -1) {
2697                         close(ctdb->recovery_lock_fd);
2698                         ctdb->recovery_lock_fd = -1;
2699                         unban_all_nodes(ctdb);
2700                 }
2701         }
2702
2703         /* ok, let that guy become recmaster then */
2704         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2705         if (ret != 0) {
2706                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2707                 talloc_free(mem_ctx);
2708                 return;
2709         }
2710
2711         talloc_free(mem_ctx);
2712         return;
2713 }
2714
2715
2716 /*
2717   force the start of the election process
2718  */
2719 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2720                            struct ctdb_node_map *nodemap)
2721 {
2722         int ret;
2723         struct ctdb_context *ctdb = rec->ctdb;
2724
2725         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2726
2727         /* set all nodes to recovery mode to stop all internode traffic */
2728         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2729         if (ret != 0) {
2730                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2731                 return;
2732         }
2733
2734         talloc_free(rec->election_timeout);
2735         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2736                                                 fast_start ?
2737                                                 timeval_current_ofs(0, 500000) :
2738                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2739                                                 ctdb_election_timeout, rec);
2740
2741         ret = send_election_request(rec, pnn, true);
2742         if (ret!=0) {
2743                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2744                 return;
2745         }
2746
2747         /* wait for a few seconds to collect all responses */
2748         ctdb_wait_election(rec);
2749 }
2750
2751
2752
2753 /*
2754   handler for when a node changes its flags
2755 */
2756 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2757                             TDB_DATA data, void *private_data)
2758 {
2759         int ret;
2760         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2761         struct ctdb_node_map *nodemap=NULL;
2762         TALLOC_CTX *tmp_ctx;
2763         int i;
2764         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2765         int disabled_flag_changed;
2766
2767         if (data.dsize != sizeof(*c)) {
2768                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2769                 return;
2770         }
2771
2772         tmp_ctx = talloc_new(ctdb);
2773         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2774
2775         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2776         if (ret != 0) {
2777                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2778                 talloc_free(tmp_ctx);
2779                 return;         
2780         }
2781
2782
2783         for (i=0;i<nodemap->num;i++) {
2784                 if (nodemap->nodes[i].pnn == c->pnn) break;
2785         }
2786
2787         if (i == nodemap->num) {
2788                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2789                 talloc_free(tmp_ctx);
2790                 return;
2791         }
2792
2793         if (c->old_flags != c->new_flags) {
2794                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2795         }
2796
2797         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2798
2799         nodemap->nodes[i].flags = c->new_flags;
2800
2801         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2802                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2803
2804         if (ret == 0) {
2805                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2806                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2807         }
2808         
2809         if (ret == 0 &&
2810             ctdb->recovery_master == ctdb->pnn &&
2811             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2812                 /* Only do the takeover run if the perm disabled or unhealthy
2813                    flags changed since these will cause an ip failover but not
2814                    a recovery.
2815                    If the node became disconnected or banned this will also
2816                    lead to an ip address failover but that is handled 
2817                    during recovery
2818                 */
2819                 if (disabled_flag_changed) {
2820                         rec->need_takeover_run = true;
2821                 }
2822         }
2823
2824         talloc_free(tmp_ctx);
2825 }
2826
2827 /*
2828   handler for when we need to push out flag changes ot all other nodes
2829 */
2830 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2831                             TDB_DATA data, void *private_data)
2832 {
2833         int ret;
2834         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2835         struct ctdb_node_map *nodemap=NULL;
2836         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2837         uint32_t recmaster;
2838         uint32_t *nodes;
2839
2840         /* find the recovery master */
2841         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2842         if (ret != 0) {
2843                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2844                 talloc_free(tmp_ctx);
2845                 return;
2846         }
2847
2848         /* read the node flags from the recmaster */
2849         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2850         if (ret != 0) {
2851                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2852                 talloc_free(tmp_ctx);
2853                 return;
2854         }
2855         if (c->pnn >= nodemap->num) {
2856                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2857                 talloc_free(tmp_ctx);
2858                 return;
2859         }
2860
2861         /* send the flags update to all connected nodes */
2862         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2863
2864         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2865                                       nodes, 0, CONTROL_TIMEOUT(),
2866                                       false, data,
2867                                       NULL, NULL,
2868                                       NULL) != 0) {
2869                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2870
2871                 talloc_free(tmp_ctx);
2872                 return;
2873         }
2874
2875         talloc_free(tmp_ctx);
2876 }
2877
2878
2879 struct verify_recmode_normal_data {
2880         uint32_t count;
2881         enum monitor_result status;
2882 };
2883
2884 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2885 {
2886         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2887
2888
2889         /* one more node has responded with recmode data*/
2890         rmdata->count--;
2891
2892         /* if we failed to get the recmode, then return an error and let
2893            the main loop try again.
2894         */
2895         if (state->state != CTDB_CONTROL_DONE) {
2896                 if (rmdata->status == MONITOR_OK) {
2897                         rmdata->status = MONITOR_FAILED;
2898                 }
2899                 return;
2900         }
2901
2902         /* if we got a response, then the recmode will be stored in the
2903            status field
2904         */
2905         if (state->status != CTDB_RECOVERY_NORMAL) {
2906                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2907                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2908         }
2909
2910         return;
2911 }
2912
2913
2914 /* verify that all nodes are in normal recovery mode */
2915 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2916 {
2917         struct verify_recmode_normal_data *rmdata;
2918         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2919         struct ctdb_client_control_state *state;
2920         enum monitor_result status;
2921         int j;
2922         
2923         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2924         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2925         rmdata->count  = 0;
2926         rmdata->status = MONITOR_OK;
2927
2928         /* loop over all active nodes and send an async getrecmode call to 
2929            them*/
2930         for (j=0; j<nodemap->num; j++) {
2931                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2932                         continue;
2933                 }
2934                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2935                                         CONTROL_TIMEOUT(), 
2936                                         nodemap->nodes[j].pnn);
2937                 if (state == NULL) {
2938                         /* we failed to send the control, treat this as 
2939                            an error and try again next iteration
2940                         */                      
2941                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2942                         talloc_free(mem_ctx);
2943                         return MONITOR_FAILED;
2944                 }
2945
2946                 /* set up the callback functions */
2947                 state->async.fn = verify_recmode_normal_callback;
2948                 state->async.private_data = rmdata;
2949
2950                 /* one more control to wait for to complete */
2951                 rmdata->count++;
2952         }
2953
2954
2955         /* now wait for up to the maximum number of seconds allowed
2956            or until all nodes we expect a response from has replied
2957         */
2958         while (rmdata->count > 0) {
2959                 event_loop_once(ctdb->ev);
2960         }
2961
2962         status = rmdata->status;
2963         talloc_free(mem_ctx);
2964         return status;
2965 }
2966
2967
2968 struct verify_recmaster_data {
2969         struct ctdb_recoverd *rec;
2970         uint32_t count;
2971         uint32_t pnn;
2972         enum monitor_result status;
2973 };
2974
2975 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2976 {
2977         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2978
2979
2980         /* one more node has responded with recmaster data*/
2981         rmdata->count--;
2982
2983         /* if we failed to get the recmaster, then return an error and let
2984            the main loop try again.
2985         */
2986         if (state->state != CTDB_CONTROL_DONE) {
2987                 if (rmdata->status == MONITOR_OK) {
2988                         rmdata->status = MONITOR_FAILED;
2989                 }
2990                 return;
2991         }
2992
2993         /* if we got a response, then the recmaster will be stored in the
2994            status field
2995         */
2996         if (state->status != rmdata->pnn) {
2997                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2998                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2999                 rmdata->status = MONITOR_ELECTION_NEEDED;
3000         }
3001
3002         return;
3003 }
3004
3005
3006 /* verify that all nodes agree that we are the recmaster */
3007 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
3008 {
3009         struct ctdb_context *ctdb = rec->ctdb;
3010         struct verify_recmaster_data *rmdata;
3011         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3012         struct ctdb_client_control_state *state;
3013         enum monitor_result status;
3014         int j;
3015         
3016         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3017         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3018         rmdata->rec    = rec;
3019         rmdata->count  = 0;
3020         rmdata->pnn    = pnn;
3021         rmdata->status = MONITOR_OK;
3022
3023         /* loop over all active nodes and send an async getrecmaster call to 
3024            them*/
3025         for (j=0; j<nodemap->num; j++) {
3026                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3027                         continue;
3028                 }
3029                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3030                                         CONTROL_TIMEOUT(),
3031                                         nodemap->nodes[j].pnn);
3032                 if (state == NULL) {
3033                         /* we failed to send the control, treat this as 
3034                            an error and try again next iteration
3035                         */                      
3036                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3037                         talloc_free(mem_ctx);
3038                         return MONITOR_FAILED;
3039                 }
3040
3041                 /* set up the callback functions */
3042                 state->async.fn = verify_recmaster_callback;
3043                 state->async.private_data = rmdata;
3044
3045                 /* one more control to wait for to complete */
3046                 rmdata->count++;
3047         }
3048
3049
3050         /* now wait for up to the maximum number of seconds allowed
3051            or until all nodes we expect a response from has replied
3052         */
3053         while (rmdata->count > 0) {
3054                 event_loop_once(ctdb->ev);
3055         }
3056
3057         status = rmdata->status;
3058         talloc_free(mem_ctx);
3059         return status;
3060 }
3061
3062 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3063                                     struct ctdb_recoverd *rec)
3064 {
3065         struct ctdb_control_get_ifaces *ifaces = NULL;
3066         TALLOC_CTX *mem_ctx;
3067         bool ret = false;
3068
3069         mem_ctx = talloc_new(NULL);
3070
3071         /* Read the interfaces from the local node */
3072         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3073                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3074                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3075                 /* We could return an error.  However, this will be
3076                  * rare so we'll decide that the interfaces have
3077                  * actually changed, just in case.
3078                  */
3079                 talloc_free(mem_ctx);
3080                 return true;
3081         }
3082
3083         if (!rec->ifaces) {
3084                 /* We haven't been here before so things have changed */
3085                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3086                 ret = true;
3087         } else if (rec->ifaces->num != ifaces->num) {
3088                 /* Number of interfaces has changed */
3089                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3090                                      rec->ifaces->num, ifaces->num));
3091                 ret = true;
3092         } else {
3093                 /* See if interface names or link states have changed */
3094                 int i;
3095                 for (i = 0; i < rec->ifaces->num; i++) {
3096                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3097                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3098                                 DEBUG(DEBUG_NOTICE,
3099                                       ("Interface in slot %d changed: %s => %s\n",
3100                                        i, iface->name, ifaces->ifaces[i].name));
3101                                 ret = true;
3102                                 break;
3103                         }
3104                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3105                                 DEBUG(DEBUG_NOTICE,
3106                                       ("Interface %s changed state: %d => %d\n",
3107                                        iface->name, iface->link_state,
3108                                        ifaces->ifaces[i].link_state));
3109                                 ret = true;
3110                                 break;
3111                         }
3112                 }
3113         }
3114
3115         talloc_free(rec->ifaces);
3116         rec->ifaces = talloc_steal(rec, ifaces);
3117
3118         talloc_free(mem_ctx);
3119         return ret;
3120 }
3121
3122 /* called to check that the local allocation of public ip addresses is ok.
3123 */
3124 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3125 {
3126         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3127         struct ctdb_uptime *uptime1 = NULL;
3128         struct ctdb_uptime *uptime2 = NULL;
3129         int ret, j;
3130         bool need_takeover_run = false;
3131
3132         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3133                                 CTDB_CURRENT_NODE, &uptime1);
3134         if (ret != 0) {
3135                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3136                 talloc_free(mem_ctx);
3137                 return -1;
3138         }
3139
3140         if (interfaces_have_changed(ctdb, rec)) {
3141                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3142                                      "local node %u - force takeover run\n",
3143                                      pnn));
3144                 need_takeover_run = true;
3145         }
3146
3147         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3148                                 CTDB_CURRENT_NODE, &uptime2);
3149         if (ret != 0) {
3150                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3151                 talloc_free(mem_ctx);
3152                 return -1;
3153         }
3154
3155         /* skip the check if the startrecovery time has changed */
3156         if (timeval_compare(&uptime1->last_recovery_started,
3157                             &uptime2->last_recovery_started) != 0) {
3158                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3159                 talloc_free(mem_ctx);
3160                 return 0;
3161         }
3162
3163         /* skip the check if the endrecovery time has changed */
3164         if (timeval_compare(&uptime1->last_recovery_finished,
3165                             &uptime2->last_recovery_finished) != 0) {
3166                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3167                 talloc_free(mem_ctx);
3168                 return 0;
3169         }
3170
3171         /* skip the check if we have started but not finished recovery */
3172         if (timeval_compare(&uptime1->last_recovery_finished,
3173                             &uptime1->last_recovery_started) != 1) {
3174                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3175                 talloc_free(mem_ctx);
3176
3177                 return 0;
3178         }
3179
3180         /* verify that we have the ip addresses we should have
3181            and we dont have ones we shouldnt have.
3182            if we find an inconsistency we set recmode to
3183            active on the local node and wait for the recmaster
3184            to do a full blown recovery.
3185            also if the pnn is -1 and we are healthy and can host the ip
3186            we also request a ip reallocation.
3187         */
3188         if (ctdb->tunable.disable_ip_failover == 0) {
3189                 struct ctdb_all_public_ips *ips = NULL;
3190
3191                 /* read the *available* IPs from the local node */
3192                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3193                 if (ret != 0) {
3194                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3195                         talloc_free(mem_ctx);
3196                         return -1;
3197                 }
3198
3199                 for (j=0; j<ips->num; j++) {
3200                         if (ips->ips[j].pnn == -1 &&
3201                             nodemap->nodes[pnn].flags == 0) {
3202                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3203                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3204                                 need_takeover_run = true;
3205                         }
3206                 }
3207
3208                 talloc_free(ips);
3209
3210                 /* read the *known* IPs from the local node */
3211                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3212                 if (ret != 0) {
3213                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3214                         talloc_free(mem_ctx);
3215                         return -1;
3216                 }
3217
3218                 for (j=0; j<ips->num; j++) {
3219                         if (ips->ips[j].pnn == pnn) {
3220                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3221                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3222                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3223                                         need_takeover_run = true;
3224                                 }
3225                         } else {
3226                                 if (ctdb->do_checkpublicip &&
3227                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3228
3229                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3230                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3231
3232                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3233                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3234                                         }
3235                                 }
3236                         }
3237                 }
3238         }
3239
3240         if (need_takeover_run) {
3241                 struct srvid_request rd;
3242                 TDB_DATA data;
3243
3244                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3245
3246                 rd.pnn = ctdb->pnn;
3247                 rd.srvid = 0;
3248                 data.dptr = (uint8_t *)&rd;
3249                 data.dsize = sizeof(rd);
3250
3251                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3252                 if (ret != 0) {
3253                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3254                 }
3255         }
3256         talloc_free(mem_ctx);
3257         return 0;
3258 }
3259
3260
3261 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3262 {
3263         struct ctdb_node_map **remote_nodemaps = callback_data;
3264
3265         if (node_pnn >= ctdb->num_nodes) {
3266                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3267                 return;
3268         }
3269
3270         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3271
3272 }
3273
3274 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3275         struct ctdb_node_map *nodemap,
3276         struct ctdb_node_map **remote_nodemaps)
3277 {
3278         uint32_t *nodes;
3279
3280         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3281         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3282                                         nodes, 0,
3283                                         CONTROL_TIMEOUT(), false, tdb_null,
3284                                         async_getnodemap_callback,
3285                                         NULL,
3286                                         remote_nodemaps) != 0) {
3287                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3288
3289                 return -1;
3290         }
3291
3292         return 0;
3293 }
3294
3295 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3296 struct ctdb_check_reclock_state {
3297         struct ctdb_context *ctdb;
3298         struct timeval start_time;
3299         int fd[2];
3300         pid_t child;
3301         struct timed_event *te;
3302         struct fd_event *fde;
3303         enum reclock_child_status status;
3304 };
3305
3306 /* when we free the reclock state we must kill any child process.
3307 */
3308 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3309 {
3310         struct ctdb_context *ctdb = state->ctdb;
3311
3312         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3313
3314         if (state->fd[0] != -1) {
3315                 close(state->fd[0]);
3316                 state->fd[0] = -1;
3317         }
3318         if (state->fd[1] != -1) {
3319                 close(state->fd[1]);
3320                 state->fd[1] = -1;
3321         }
3322         ctdb_kill(ctdb, state->child, SIGKILL);
3323         return 0;
3324 }
3325
3326 /*
3327   called if our check_reclock child times out. this would happen if
3328   i/o to the reclock file blocks.
3329  */
3330 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3331                                          struct timeval t, void *private_data)
3332 {
3333         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3334                                            struct ctdb_check_reclock_state);
3335
3336         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3337         state->status = RECLOCK_TIMEOUT;
3338 }
3339
3340 /* this is called when the child process has completed checking the reclock
3341    file and has written data back to us through the pipe.
3342 */
3343 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3344                              uint16_t flags, void *private_data)
3345 {
3346         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3347                                              struct ctdb_check_reclock_state);
3348         char c = 0;
3349         int ret;
3350
3351         /* we got a response from our child process so we can abort the
3352            timeout.
3353         */
3354         talloc_free(state->te);
3355         state->te = NULL;
3356
3357         ret = read(state->fd[0], &c, 1);
3358         if (ret != 1 || c != RECLOCK_OK) {
3359                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3360                 state->status = RECLOCK_FAILED;
3361
3362                 return;
3363         }
3364
3365         state->status = RECLOCK_OK;
3366         return;
3367 }
3368
3369 static int check_recovery_lock(struct ctdb_context *ctdb)
3370 {
3371         int ret;
3372         struct ctdb_check_reclock_state *state;
3373         pid_t parent = getpid();
3374
3375         if (ctdb->recovery_lock_fd == -1) {
3376                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3377                 return -1;
3378         }
3379
3380         state = talloc(ctdb, struct ctdb_check_reclock_state);
3381         CTDB_NO_MEMORY(ctdb, state);
3382
3383         state->ctdb = ctdb;
3384         state->start_time = timeval_current();
3385         state->status = RECLOCK_CHECKING;
3386         state->fd[0] = -1;
3387         state->fd[1] = -1;
3388
3389         ret = pipe(state->fd);
3390         if (ret != 0) {
3391                 talloc_free(state);
3392                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3393                 return -1;
3394         }
3395
3396         state->child = ctdb_fork(ctdb);
3397         if (state->child == (pid_t)-1) {
3398                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3399                 close(state->fd[0]);
3400                 state->fd[0] = -1;
3401                 close(state->fd[1]);
3402                 state->fd[1] = -1;
3403                 talloc_free(state);
3404                 return -1;
3405         }
3406
3407         if (state->child == 0) {
3408                 char cc = RECLOCK_OK;
3409                 close(state->fd[0]);
3410                 state->fd[0] = -1;
3411
3412                 ctdb_set_process_name("ctdb_rec_reclock");
3413                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3414                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3415                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3416                         cc = RECLOCK_FAILED;
3417                 }
3418
3419                 write(state->fd[1], &cc, 1);
3420                 /* make sure we die when our parent dies */
3421                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3422                         sleep(5);
3423                 }
3424                 _exit(0);
3425         }
3426         close(state->fd[1]);
3427         state->fd[1] = -1;
3428         set_close_on_exec(state->fd[0]);
3429
3430         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3431
3432         talloc_set_destructor(state, check_reclock_destructor);
3433
3434         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3435                                     ctdb_check_reclock_timeout, state);
3436         if (state->te == NULL) {
3437                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3438                 talloc_free(state);
3439                 return -1;
3440         }
3441
3442         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3443                                 EVENT_FD_READ,
3444                                 reclock_child_handler,
3445                                 (void *)state);
3446
3447         if (state->fde == NULL) {
3448                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3449                 talloc_free(state);
3450                 return -1;
3451         }
3452         tevent_fd_set_auto_close(state->fde);
3453
3454         while (state->status == RECLOCK_CHECKING) {
3455                 event_loop_once(ctdb->ev);
3456         }
3457
3458         if (state->status == RECLOCK_FAILED) {
3459                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3460                 close(ctdb->recovery_lock_fd);
3461                 ctdb->recovery_lock_fd = -1;
3462                 talloc_free(state);
3463                 return -1;
3464         }
3465
3466         talloc_free(state);
3467         return 0;
3468 }
3469
3470 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3471 {
3472         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3473         const char *reclockfile;
3474
3475         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3476                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3477                 talloc_free(tmp_ctx);
3478                 return -1;      
3479         }
3480
3481         if (reclockfile == NULL) {
3482                 if (ctdb->recovery_lock_file != NULL) {
3483                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3484                         talloc_free(ctdb->recovery_lock_file);
3485                         ctdb->recovery_lock_file = NULL;
3486                         if (ctdb->recovery_lock_fd != -1) {
3487                                 close(ctdb->recovery_lock_fd);
3488                                 ctdb->recovery_lock_fd = -1;
3489                         }
3490                 }
3491                 ctdb->tunable.verify_recovery_lock = 0;
3492                 talloc_free(tmp_ctx);
3493                 return 0;
3494         }
3495
3496         if (ctdb->recovery_lock_file == NULL) {
3497                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3498                 if (ctdb->recovery_lock_fd != -1) {
3499                         close(ctdb->recovery_lock_fd);
3500                         ctdb->recovery_lock_fd = -1;
3501                 }
3502                 talloc_free(tmp_ctx);
3503                 return 0;
3504         }
3505
3506
3507         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3508                 talloc_free(tmp_ctx);
3509                 return 0;
3510         }
3511
3512         talloc_free(ctdb->recovery_lock_file);
3513         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3514         ctdb->tunable.verify_recovery_lock = 0;
3515         if (ctdb->recovery_lock_fd != -1) {
3516                 close(ctdb->recovery_lock_fd);
3517                 ctdb->recovery_lock_fd = -1;
3518         }
3519
3520         talloc_free(tmp_ctx);
3521         return 0;
3522 }
3523
3524 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3525                       TALLOC_CTX *mem_ctx)
3526 {
3527         uint32_t pnn;
3528         struct ctdb_node_map *nodemap=NULL;
3529         struct ctdb_node_map *recmaster_nodemap=NULL;
3530         struct ctdb_node_map **remote_nodemaps=NULL;
3531         struct ctdb_vnn_map *vnnmap=NULL;
3532         struct ctdb_vnn_map *remote_vnnmap=NULL;
3533         int32_t debug_level;
3534         int i, j, ret;
3535         bool self_ban;
3536
3537
3538         /* verify that the main daemon is still running */
3539         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3540                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3541                 exit(-1);
3542         }
3543
3544         /* ping the local daemon to tell it we are alive */
3545         ctdb_ctrl_recd_ping(ctdb);
3546
3547         if (rec->election_timeout) {
3548                 /* an election is in progress */
3549                 return;
3550         }
3551
3552         /* read the debug level from the parent and update locally */
3553         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3554         if (ret !=0) {
3555                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3556                 return;
3557         }
3558         LogLevel = debug_level;
3559
3560         /* get relevant tunables */
3561         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3562         if (ret != 0) {
3563                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3564                 return;
3565         }
3566
3567         /* get the current recovery lock file from the server */
3568         if (update_recovery_lock_file(ctdb) != 0) {
3569                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3570                 return;
3571         }
3572
3573         /* Make sure that if recovery lock verification becomes disabled when
3574            we close the file
3575         */
3576         if (ctdb->tunable.verify_recovery_lock == 0) {
3577                 if (ctdb->recovery_lock_fd != -1) {
3578                         close(ctdb->recovery_lock_fd);
3579                         ctdb->recovery_lock_fd = -1;
3580                 }
3581         }
3582
3583         pnn = ctdb_get_pnn(ctdb);
3584
3585         /* get the vnnmap */
3586         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3587         if (ret != 0) {
3588                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3589                 return;
3590         }
3591
3592
3593         /* get number of nodes */
3594         if (rec->nodemap) {
3595                 talloc_free(rec->nodemap);
3596                 rec->nodemap = NULL;
3597                 nodemap=NULL;
3598         }
3599         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3600         if (ret != 0) {
3601                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3602                 return;
3603         }
3604         nodemap = rec->nodemap;
3605
3606         /* remember our own node flags */
3607         rec->node_flags = nodemap->nodes[pnn].flags;
3608
3609         ban_misbehaving_nodes(rec, &self_ban);
3610         if (self_ban) {
3611                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3612                 return;
3613         }
3614
3615         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3616            also frozen and that the recmode is set to active.
3617         */
3618         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3619                 /* If this node has become inactive then we want to
3620                  * reduce the chances of it taking over the recovery
3621                  * master role when it becomes active again.  This
3622                  * helps to stabilise the recovery master role so that
3623                  * it stays on the most stable node.
3624                  */
3625                 rec->priority_time = timeval_current();
3626
3627                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3628                 if (ret != 0) {
3629                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3630                 }
3631                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3632                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3633
3634                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3635                         if (ret != 0) {
3636                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3637                                 return;
3638                         }
3639                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3640                         if (ret != 0) {
3641                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3642
3643                                 return;
3644                         }
3645                 }
3646
3647                 /* If this node is stopped or banned then it is not the recovery
3648                  * master, so don't do anything. This prevents stopped or banned
3649                  * node from starting election and sending unnecessary controls.
3650                  */
3651                 return;
3652         }
3653
3654         /* check which node is the recovery master */
3655         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3656         if (ret != 0) {
3657                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3658                 return;
3659         }
3660
3661         /* If we are not the recmaster then do some housekeeping */
3662         if (rec->recmaster != pnn) {
3663                 /* Ignore any IP reallocate requests - only recmaster
3664                  * processes them
3665                  */
3666                 TALLOC_FREE(rec->reallocate_requests);
3667                 /* Clear any nodes that should be force rebalanced in
3668                  * the next takeover run.  If the recovery master role
3669                  * has moved then we don't want to process these some
3670                  * time in the future.
3671                  */
3672                 TALLOC_FREE(rec->force_rebalance_nodes);
3673         }
3674
3675         /* This is a special case.  When recovery daemon is started, recmaster
3676          * is set to -1.  If a node is not started in stopped state, then
3677          * start election to decide recovery master
3678          */
3679         if (rec->recmaster == (uint32_t)-1) {
3680                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3681                 force_election(rec, pnn, nodemap);
3682                 return;
3683         }
3684
3685         /* update the capabilities for all nodes */
3686         ret = update_capabilities(ctdb, nodemap);
3687         if (ret != 0) {
3688                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3689                 return;
3690         }
3691
3692         /*
3693          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3694          * but we have, then force an election and try to become the new
3695          * recmaster.
3696          */
3697         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3698             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3699              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3700                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3701                                   " but we (node %u) have - force an election\n",
3702                                   rec->recmaster, pnn));
3703                 force_election(rec, pnn, nodemap);
3704                 return;
3705         }
3706
3707         /* count how many active nodes there are */
3708         rec->num_active    = 0;
3709         rec->num_lmasters  = 0;
3710         rec->num_connected = 0;
3711         for (i=0; i<nodemap->num; i++) {
3712                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3713                         rec->num_active++;
3714                         if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
3715                                 rec->num_lmasters++;
3716                         }
3717                 }
3718                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3719                         rec->num_connected++;
3720                 }
3721         }
3722
3723
3724         /* verify that the recmaster node is still active */
3725         for (j=0; j<nodemap->num; j++) {
3726                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3727                         break;
3728                 }
3729         }
3730
3731         if (j == nodemap->num) {
3732                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3733                 force_election(rec, pnn, nodemap);
3734                 return;
3735         }
3736
3737         /* if recovery master is disconnected we must elect a new recmaster */
3738         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3739                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3740                 force_election(rec, pnn, nodemap);
3741                 return;
3742         }
3743
3744         /* get nodemap from the recovery master to check if it is inactive */
3745         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3746                                    mem_ctx, &recmaster_nodemap);
3747         if (ret != 0) {
3748                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3749                           nodemap->nodes[j].pnn));
3750                 return;
3751         }
3752
3753
3754         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3755             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3756                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3757                 /*
3758                  * update our nodemap to carry the recmaster's notion of
3759                  * its own flags, so that we don't keep freezing the
3760                  * inactive recmaster node...
3761                  */
3762                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3763                 force_election(rec, pnn, nodemap);
3764                 return;
3765         }
3766
3767         /* verify that we have all ip addresses we should have and we dont
3768          * have addresses we shouldnt have.
3769          */ 
3770         if (ctdb->tunable.disable_ip_failover == 0 &&
3771             rec->takeover_runs_disable_ctx == NULL) {
3772                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3773                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3774                 }
3775         }
3776
3777
3778         /* if we are not the recmaster then we do not need to check
3779            if recovery is needed
3780          */
3781         if (pnn != rec->recmaster) {
3782                 return;
3783         }
3784
3785
3786         /* ensure our local copies of flags are right */
3787         ret = update_local_flags(rec, nodemap);
3788         if (ret == MONITOR_ELECTION_NEEDED) {
3789                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3790                 force_election(rec, pnn, nodemap);
3791                 return;
3792         }
3793         if (ret != MONITOR_OK) {
3794                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3795                 return;
3796         }
3797
3798         if (ctdb->num_nodes != nodemap->num) {
3799                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3800                 reload_nodes_file(ctdb);
3801                 return;
3802         }
3803
3804         /* verify that all active nodes agree that we are the recmaster */
3805         switch (verify_recmaster(rec, nodemap, pnn)) {
3806         case MONITOR_RECOVERY_NEEDED:
3807                 /* can not happen */
3808                 return;
3809         case MONITOR_ELECTION_NEEDED:
3810                 force_election(rec, pnn, nodemap);
3811                 return;
3812         case MONITOR_OK:
3813                 break;
3814         case MONITOR_FAILED:
3815                 return;
3816         }
3817
3818
3819         if (rec->need_recovery) {
3820                 /* a previous recovery didn't finish */
3821                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3822                 return;
3823         }
3824
3825         /* verify that all active nodes are in normal mode 
3826            and not in recovery mode 
3827         */
3828         switch (verify_recmode(ctdb, nodemap)) {
3829         case MONITOR_RECOVERY_NEEDED:
3830                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3831                 return;
3832         case MONITOR_FAILED:
3833                 return;
3834         case MONITOR_ELECTION_NEEDED:
3835                 /* can not happen */
3836         case MONITOR_OK:
3837                 break;
3838         }
3839
3840
3841         if (ctdb->tunable.verify_recovery_lock != 0) {
3842                 /* we should have the reclock - check its not stale */
3843                 ret = check_recovery_lock(ctdb);
3844                 if (ret != 0) {
3845                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3846                         ctdb_set_culprit(rec, ctdb->pnn);
3847                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3848                         return;
3849                 }
3850         }
3851
3852
3853         /* if there are takeovers requested, perform it and notify the waiters */
3854         if (rec->takeover_runs_disable_ctx == NULL &&
3855             rec->reallocate_requests) {
3856                 process_ipreallocate_requests(ctdb, rec);
3857         }
3858
3859         /* get the nodemap for all active remote nodes
3860          */
3861         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3862         if (remote_nodemaps == NULL) {
3863                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3864                 return;
3865         }
3866         for(i=0; i<nodemap->num; i++) {
3867                 remote_nodemaps[i] = NULL;
3868         }
3869         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3870                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3871                 return;
3872         } 
3873
3874         /* verify that all other nodes have the same nodemap as we have
3875         */
3876         for (j=0; j<nodemap->num; j++) {
3877                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3878                         continue;
3879                 }
3880
3881                 if (remote_nodemaps[j] == NULL) {
3882                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3883                         ctdb_set_culprit(rec, j);
3884
3885                         return;
3886                 }
3887
3888                 /* if the nodes disagree on how many nodes there are
3889                    then this is a good reason to try recovery
3890                  */
3891                 if (remote_nodemaps[j]->num != nodemap->num) {
3892                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3893                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3894                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3895                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3896                         return;
3897                 }
3898
3899                 /* if the nodes disagree on which nodes exist and are
3900                    active, then that is also a good reason to do recovery
3901                  */
3902                 for (i=0;i<nodemap->num;i++) {
3903                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3904                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3905                                           nodemap->nodes[j].pnn, i, 
3906                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3907                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3908                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3909                                             vnnmap);
3910                                 return;
3911                         }
3912                 }
3913         }
3914
3915         /*
3916          * Update node flags obtained from each active node. This ensure we have
3917          * up-to-date information for all the nodes.
3918          */
3919         for (j=0; j<nodemap->num; j++) {
3920                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3921                         continue;
3922                 }
3923                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3924         }
3925
3926         for (j=0; j<nodemap->num; j++) {
3927                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3928                         continue;
3929                 }
3930
3931                 /* verify the flags are consistent
3932                 */
3933                 for (i=0; i<nodemap->num; i++) {
3934                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3935                                 continue;
3936                         }
3937                         
3938                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3939                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3940                                   nodemap->nodes[j].pnn, 
3941                                   nodemap->nodes[i].pnn, 
3942                                   remote_nodemaps[j]->nodes[i].flags,
3943                                   nodemap->nodes[i].flags));
3944                                 if (i == j) {
3945                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3946                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3947                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3948                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3949                                                     vnnmap);
3950                                         return;
3951                                 } else {
3952                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3953                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3954                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3955                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3956                                                     vnnmap);
3957                                         return;
3958                                 }
3959                         }
3960                 }
3961         }
3962
3963
3964         /* There must be the same number of lmasters in the vnn map as
3965          * there are active nodes with the lmaster capability...  or
3966          * do a recovery.
3967          */
3968         if (vnnmap->size != rec->num_lmasters) {
3969                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3970                           vnnmap->size, rec->num_lmasters));
3971                 ctdb_set_culprit(rec, ctdb->pnn);
3972                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3973                 return;
3974         }
3975
3976         /* verify that all active nodes in the nodemap also exist in 
3977            the vnnmap.
3978          */
3979         for (j=0; j<nodemap->num; j++) {
3980                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3981                         continue;
3982                 }
3983                 if (nodemap->nodes[j].pnn == pnn) {
3984                         continue;
3985                 }
3986
3987                 for (i=0; i<vnnmap->size; i++) {
3988                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3989                                 break;
3990                         }
3991                 }
3992                 if (i == vnnmap->size) {
3993                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3994                                   nodemap->nodes[j].pnn));
3995                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3996                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3997                         return;
3998                 }
3999         }
4000
4001         
4002         /* verify that all other nodes have the same vnnmap
4003            and are from the same generation
4004          */
4005         for (j=0; j<nodemap->num; j++) {
4006                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
4007                         continue;
4008                 }
4009                 if (nodemap->nodes[j].pnn == pnn) {
4010                         continue;
4011                 }
4012
4013                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
4014                                           mem_ctx, &remote_vnnmap);
4015                 if (ret != 0) {
4016                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
4017                                   nodemap->nodes[j].pnn));
4018                         return;
4019                 }
4020
4021                 /* verify the vnnmap generation is the same */
4022                 if (vnnmap->generation != remote_vnnmap->generation) {
4023                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
4024                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
4025                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4026                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4027                         return;
4028                 }
4029
4030                 /* verify the vnnmap size is the same */
4031                 if (vnnmap->size != remote_vnnmap->size) {
4032                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4033                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4034                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4035                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4036                         return;
4037                 }
4038
4039                 /* verify the vnnmap is the same */
4040                 for (i=0;i<vnnmap->size;i++) {
4041                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4042                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4043                                           nodemap->nodes[j].pnn));
4044                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4045                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4046                                             vnnmap);
4047                                 return;
4048                         }
4049                 }
4050         }
4051
4052         /* we might need to change who has what IP assigned */
4053         if (rec->need_takeover_run) {
4054                 uint32_t culprit = (uint32_t)-1;
4055
4056                 rec->need_takeover_run = false;
4057
4058                 /* update the list of public ips that a node can handle for
4059                    all connected nodes
4060                 */
4061                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4062                 if (ret != 0) {
4063                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4064                                          culprit));
4065                         rec->need_takeover_run = true;
4066                         return;
4067                 }
4068
4069                 /* execute the "startrecovery" event script on all nodes */
4070                 ret = run_startrecovery_eventscript(rec, nodemap);
4071                 if (ret!=0) {
4072                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4073                         ctdb_set_culprit(rec, ctdb->pnn);
4074                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4075                         return;
4076                 }
4077
4078                 /* If takeover run fails, then the offending nodes are
4079                  * assigned ban culprit counts. And we re-try takeover.
4080                  * If takeover run fails repeatedly, the node would get
4081                  * banned.
4082                  *
4083                  * If rec->need_takeover_run is not set to true at this
4084                  * failure, monitoring is disabled cluster-wide (via
4085                  * startrecovery eventscript) and will not get enabled.
4086                  */
4087                 if (!do_takeover_run(rec, nodemap, true)) {
4088                         return;
4089                 }
4090
4091                 /* execute the "recovered" event script on all nodes */
4092                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4093 #if 0
4094 // we cant check whether the event completed successfully
4095 // since this script WILL fail if the node is in recovery mode
4096 // and if that race happens, the code here would just cause a second
4097 // cascading recovery.
4098                 if (ret!=0) {
4099                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4100                         ctdb_set_culprit(rec, ctdb->pnn);
4101                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4102                 }
4103 #endif
4104         }
4105 }
4106
4107 /*
4108   the main monitoring loop
4109  */
4110 static void monitor_cluster(struct ctdb_context *ctdb)
4111 {
4112         struct ctdb_recoverd *rec;
4113
4114         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4115
4116         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4117         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4118
4119         rec->ctdb = ctdb;
4120
4121         rec->takeover_run_in_progress = false;
4122
4123         rec->priority_time = timeval_current();
4124
4125         /* register a message port for sending memory dumps */
4126         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4127
4128         /* register a message port for requesting logs */
4129         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4130
4131         /* register a message port for clearing logs */
4132         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4133
4134         /* register a message port for recovery elections */
4135         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4136
4137         /* when nodes are disabled/enabled */
4138         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4139
4140         /* when we are asked to puch out a flag change */
4141         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4142
4143         /* register a message port for vacuum fetch */
4144         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4145
4146         /* register a message port for reloadnodes  */
4147         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4148
4149         /* register a message port for performing a takeover run */
4150         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4151
4152         /* register a message port for disabling the ip check for a short while */
4153         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4154
4155         /* register a message port for updating the recovery daemons node assignment for an ip */
4156         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4157
4158         /* register a message port for forcing a rebalance of a node next
4159            reallocation */
4160         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4161
4162         /* Register a message port for disabling takeover runs */
4163         ctdb_client_set_message_handler(ctdb,
4164                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4165                                         disable_takeover_runs_handler, rec);
4166
4167         for (;;) {
4168                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4169                 struct timeval start;
4170                 double elapsed;
4171
4172                 if (!mem_ctx) {
4173                         DEBUG(DEBUG_CRIT,(__location__
4174                                           " Failed to create temp context\n"));
4175                         exit(-1);
4176                 }
4177
4178                 start = timeval_current();
4179                 main_loop(ctdb, rec, mem_ctx);
4180                 talloc_free(mem_ctx);
4181
4182                 /* we only check for recovery once every second */
4183                 elapsed = timeval_elapsed(&start);
4184                 if (elapsed < ctdb->tunable.recover_interval) {
4185                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4186                                           - elapsed);
4187                 }
4188         }
4189 }
4190
4191 /*
4192   event handler for when the main ctdbd dies
4193  */
4194 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4195                                  uint16_t flags, void *private_data)
4196 {
4197         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4198         _exit(1);
4199 }
4200
4201 /*
4202   called regularly to verify that the recovery daemon is still running
4203  */
4204 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4205                               struct timeval yt, void *p)
4206 {
4207         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4208
4209         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4210                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4211
4212                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4213                                 ctdb_restart_recd, ctdb);
4214
4215                 return;
4216         }
4217
4218         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4219                         timeval_current_ofs(30, 0),
4220                         ctdb_check_recd, ctdb);
4221 }
4222
4223 static void recd_sig_child_handler(struct event_context *ev,
4224         struct signal_event *se, int signum, int count,
4225         void *dont_care, 
4226         void *private_data)
4227 {
4228 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4229         int status;
4230         pid_t pid = -1;
4231
4232         while (pid != 0) {
4233                 pid = waitpid(-1, &status, WNOHANG);
4234                 if (pid == -1) {
4235                         if (errno != ECHILD) {
4236                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4237                         }
4238                         return;
4239                 }
4240                 if (pid > 0) {
4241                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4242                 }
4243         }
4244 }
4245
4246 /*
4247   startup the recovery daemon as a child of the main ctdb daemon
4248  */
4249 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4250 {
4251         int fd[2];
4252         struct signal_event *se;
4253         struct tevent_fd *fde;
4254
4255         if (pipe(fd) != 0) {
4256                 return -1;
4257         }
4258
4259         ctdb->ctdbd_pid = getpid();
4260
4261         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4262         if (ctdb->recoverd_pid == -1) {
4263                 return -1;
4264         }
4265
4266         if (ctdb->recoverd_pid != 0) {
4267                 talloc_free(ctdb->recd_ctx);
4268                 ctdb->recd_ctx = talloc_new(ctdb);
4269                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4270
4271                 close(fd[0]);
4272                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4273                                 timeval_current_ofs(30, 0),
4274                                 ctdb_check_recd, ctdb);
4275                 return 0;
4276         }
4277
4278         close(fd[1]);
4279
4280         srandom(getpid() ^ time(NULL));
4281
4282         /* Clear the log ringbuffer */
4283         ctdb_clear_log(ctdb);
4284
4285         ctdb_set_process_name("ctdb_recovered");
4286         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4287                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4288                 exit(1);
4289         }
4290
4291         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4292
4293         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4294                      ctdb_recoverd_parent, &fd[0]);
4295         tevent_fd_set_auto_close(fde);
4296
4297         /* set up a handler to pick up sigchld */
4298         se = event_add_signal(ctdb->ev, ctdb,
4299                                      SIGCHLD, 0,
4300                                      recd_sig_child_handler,
4301                                      ctdb);
4302         if (se == NULL) {
4303                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4304                 exit(1);
4305         }
4306
4307         monitor_cluster(ctdb);
4308
4309         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4310         return -1;
4311 }
4312
4313 /*
4314   shutdown the recovery daemon
4315  */
4316 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4317 {
4318         if (ctdb->recoverd_pid == 0) {
4319                 return;
4320         }
4321
4322         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4323         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4324
4325         TALLOC_FREE(ctdb->recd_ctx);
4326         TALLOC_FREE(ctdb->recd_ping_count);
4327 }
4328
4329 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4330                        struct timeval t, void *private_data)
4331 {
4332         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4333
4334         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4335         ctdb_stop_recoverd(ctdb);
4336         ctdb_start_recoverd(ctdb);
4337 }