recoverd: Remove unused CTDB_SRVID_RELOAD_ALL_IPS and handler
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_connected;
133         uint32_t last_culprit_node;
134         struct ctdb_node_map *nodemap;
135         struct timeval priority_time;
136         bool need_takeover_run;
137         bool need_recovery;
138         uint32_t node_flags;
139         struct timed_event *send_election_te;
140         struct timed_event *election_timeout;
141         struct vacuum_info *vacuum_info;
142         struct srvid_requests *reallocate_requests;
143         bool takeover_run_in_progress;
144         TALLOC_CTX *takeover_runs_disable_ctx;
145         struct ctdb_control_get_ifaces *ifaces;
146         TALLOC_CTX *deferred_rebalance_ctx;
147 };
148
149 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
150 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
151
152 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
153
154 /*
155   ban a node for a period of time
156  */
157 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
158 {
159         int ret;
160         struct ctdb_context *ctdb = rec->ctdb;
161         struct ctdb_ban_time bantime;
162        
163         if (!ctdb_validate_pnn(ctdb, pnn)) {
164                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
165                 return;
166         }
167
168         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
169
170         bantime.pnn  = pnn;
171         bantime.time = ban_time;
172
173         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
174         if (ret != 0) {
175                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
176                 return;
177         }
178
179 }
180
181 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
182
183
184 /*
185   remember the trouble maker
186  */
187 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
188 {
189         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
190         struct ctdb_banning_state *ban_state;
191
192         if (culprit > ctdb->num_nodes) {
193                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
194                 return;
195         }
196
197         /* If we are banned or stopped, do not set other nodes as culprits */
198         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
199                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
200                 return;
201         }
202
203         if (ctdb->nodes[culprit]->ban_state == NULL) {
204                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
205                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
206
207                 
208         }
209         ban_state = ctdb->nodes[culprit]->ban_state;
210         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
211                 /* this was the first time in a long while this node
212                    misbehaved so we will forgive any old transgressions.
213                 */
214                 ban_state->count = 0;
215         }
216
217         ban_state->count += count;
218         ban_state->last_reported_time = timeval_current();
219         rec->last_culprit_node = culprit;
220 }
221
222 /*
223   remember the trouble maker
224  */
225 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
226 {
227         ctdb_set_culprit_count(rec, culprit, 1);
228 }
229
230
231 /* this callback is called for every node that failed to execute the
232    recovered event
233 */
234 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
235 {
236         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
237
238         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
239
240         ctdb_set_culprit(rec, node_pnn);
241 }
242
243 /*
244   run the "recovered" eventscript on all nodes
245  */
246 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
247 {
248         TALLOC_CTX *tmp_ctx;
249         uint32_t *nodes;
250         struct ctdb_context *ctdb = rec->ctdb;
251
252         tmp_ctx = talloc_new(ctdb);
253         CTDB_NO_MEMORY(ctdb, tmp_ctx);
254
255         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
256         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
257                                         nodes, 0,
258                                         CONTROL_TIMEOUT(), false, tdb_null,
259                                         NULL, recovered_fail_callback,
260                                         rec) != 0) {
261                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
262
263                 talloc_free(tmp_ctx);
264                 return -1;
265         }
266
267         talloc_free(tmp_ctx);
268         return 0;
269 }
270
271 /* this callback is called for every node that failed to execute the
272    start recovery event
273 */
274 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
275 {
276         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
277
278         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
279
280         ctdb_set_culprit(rec, node_pnn);
281 }
282
283 /*
284   run the "startrecovery" eventscript on all nodes
285  */
286 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
287 {
288         TALLOC_CTX *tmp_ctx;
289         uint32_t *nodes;
290         struct ctdb_context *ctdb = rec->ctdb;
291
292         tmp_ctx = talloc_new(ctdb);
293         CTDB_NO_MEMORY(ctdb, tmp_ctx);
294
295         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
296         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
297                                         nodes, 0,
298                                         CONTROL_TIMEOUT(), false, tdb_null,
299                                         NULL,
300                                         startrecovery_fail_callback,
301                                         rec) != 0) {
302                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
303                 talloc_free(tmp_ctx);
304                 return -1;
305         }
306
307         talloc_free(tmp_ctx);
308         return 0;
309 }
310
311 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
312 {
313         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
314                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
315                 return;
316         }
317         if (node_pnn < ctdb->num_nodes) {
318                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
319         }
320
321         if (node_pnn == ctdb->pnn) {
322                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
323         }
324 }
325
326 /*
327   update the node capabilities for all connected nodes
328  */
329 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
330 {
331         uint32_t *nodes;
332         TALLOC_CTX *tmp_ctx;
333
334         tmp_ctx = talloc_new(ctdb);
335         CTDB_NO_MEMORY(ctdb, tmp_ctx);
336
337         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
338         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
339                                         nodes, 0,
340                                         CONTROL_TIMEOUT(),
341                                         false, tdb_null,
342                                         async_getcap_callback, NULL,
343                                         NULL) != 0) {
344                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
345                 talloc_free(tmp_ctx);
346                 return -1;
347         }
348
349         talloc_free(tmp_ctx);
350         return 0;
351 }
352
353 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
354 {
355         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
356
357         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
358         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
359 }
360
361 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
362 {
363         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
364
365         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
366         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
367 }
368
369 /*
370   change recovery mode on all nodes
371  */
372 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
373 {
374         TDB_DATA data;
375         uint32_t *nodes;
376         TALLOC_CTX *tmp_ctx;
377
378         tmp_ctx = talloc_new(ctdb);
379         CTDB_NO_MEMORY(ctdb, tmp_ctx);
380
381         /* freeze all nodes */
382         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
383         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
384                 int i;
385
386                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
387                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
388                                                 nodes, i,
389                                                 CONTROL_TIMEOUT(),
390                                                 false, tdb_null,
391                                                 NULL,
392                                                 set_recmode_fail_callback,
393                                                 rec) != 0) {
394                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
395                                 talloc_free(tmp_ctx);
396                                 return -1;
397                         }
398                 }
399         }
400
401
402         data.dsize = sizeof(uint32_t);
403         data.dptr = (unsigned char *)&rec_mode;
404
405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
406                                         nodes, 0,
407                                         CONTROL_TIMEOUT(),
408                                         false, data,
409                                         NULL, NULL,
410                                         NULL) != 0) {
411                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
412                 talloc_free(tmp_ctx);
413                 return -1;
414         }
415
416         talloc_free(tmp_ctx);
417         return 0;
418 }
419
420 /*
421   change recovery master on all node
422  */
423 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
424 {
425         TDB_DATA data;
426         TALLOC_CTX *tmp_ctx;
427         uint32_t *nodes;
428
429         tmp_ctx = talloc_new(ctdb);
430         CTDB_NO_MEMORY(ctdb, tmp_ctx);
431
432         data.dsize = sizeof(uint32_t);
433         data.dptr = (unsigned char *)&pnn;
434
435         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
436         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
437                                         nodes, 0,
438                                         CONTROL_TIMEOUT(), false, data,
439                                         NULL, NULL,
440                                         NULL) != 0) {
441                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
442                 talloc_free(tmp_ctx);
443                 return -1;
444         }
445
446         talloc_free(tmp_ctx);
447         return 0;
448 }
449
450 /* update all remote nodes to use the same db priority that we have
451    this can fail if the remove node has not yet been upgraded to 
452    support this function, so we always return success and never fail
453    a recovery if this call fails.
454 */
455 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
456         struct ctdb_node_map *nodemap, 
457         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int db;
460         uint32_t *nodes;
461
462         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
463
464         /* step through all local databases */
465         for (db=0; db<dbmap->num;db++) {
466                 TDB_DATA data;
467                 struct ctdb_db_priority db_prio;
468                 int ret;
469
470                 db_prio.db_id     = dbmap->dbs[db].dbid;
471                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
472                 if (ret != 0) {
473                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
474                         continue;
475                 }
476
477                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
478
479                 data.dptr  = (uint8_t *)&db_prio;
480                 data.dsize = sizeof(db_prio);
481
482                 if (ctdb_client_async_control(ctdb,
483                                         CTDB_CONTROL_SET_DB_PRIORITY,
484                                         nodes, 0,
485                                         CONTROL_TIMEOUT(), false, data,
486                                         NULL, NULL,
487                                         NULL) != 0) {
488                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
489                 }
490         }
491
492         return 0;
493 }                       
494
495 /*
496   ensure all other nodes have attached to any databases that we have
497  */
498 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
499                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
500 {
501         int i, j, db, ret;
502         struct ctdb_dbid_map *remote_dbmap;
503
504         /* verify that all other nodes have all our databases */
505         for (j=0; j<nodemap->num; j++) {
506                 /* we dont need to ourself ourselves */
507                 if (nodemap->nodes[j].pnn == pnn) {
508                         continue;
509                 }
510                 /* dont check nodes that are unavailable */
511                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
512                         continue;
513                 }
514
515                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
516                                          mem_ctx, &remote_dbmap);
517                 if (ret != 0) {
518                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
519                         return -1;
520                 }
521
522                 /* step through all local databases */
523                 for (db=0; db<dbmap->num;db++) {
524                         const char *name;
525
526
527                         for (i=0;i<remote_dbmap->num;i++) {
528                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
529                                         break;
530                                 }
531                         }
532                         /* the remote node already have this database */
533                         if (i!=remote_dbmap->num) {
534                                 continue;
535                         }
536                         /* ok so we need to create this database */
537                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
538                                             mem_ctx, &name);
539                         if (ret != 0) {
540                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
541                                 return -1;
542                         }
543                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
544                                            mem_ctx, name,
545                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
546                         if (ret != 0) {
547                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
548                                 return -1;
549                         }
550                 }
551         }
552
553         return 0;
554 }
555
556
557 /*
558   ensure we are attached to any databases that anyone else is attached to
559  */
560 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
561                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
562 {
563         int i, j, db, ret;
564         struct ctdb_dbid_map *remote_dbmap;
565
566         /* verify that we have all database any other node has */
567         for (j=0; j<nodemap->num; j++) {
568                 /* we dont need to ourself ourselves */
569                 if (nodemap->nodes[j].pnn == pnn) {
570                         continue;
571                 }
572                 /* dont check nodes that are unavailable */
573                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
574                         continue;
575                 }
576
577                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
578                                          mem_ctx, &remote_dbmap);
579                 if (ret != 0) {
580                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
581                         return -1;
582                 }
583
584                 /* step through all databases on the remote node */
585                 for (db=0; db<remote_dbmap->num;db++) {
586                         const char *name;
587
588                         for (i=0;i<(*dbmap)->num;i++) {
589                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
590                                         break;
591                                 }
592                         }
593                         /* we already have this db locally */
594                         if (i!=(*dbmap)->num) {
595                                 continue;
596                         }
597                         /* ok so we need to create this database and
598                            rebuild dbmap
599                          */
600                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
601                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
602                         if (ret != 0) {
603                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
604                                           nodemap->nodes[j].pnn));
605                                 return -1;
606                         }
607                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
608                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
609                         if (ret != 0) {
610                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
611                                 return -1;
612                         }
613                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
614                         if (ret != 0) {
615                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
616                                 return -1;
617                         }
618                 }
619         }
620
621         return 0;
622 }
623
624
625 /*
626   pull the remote database contents from one node into the recdb
627  */
628 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
629                                     struct tdb_wrap *recdb, uint32_t dbid)
630 {
631         int ret;
632         TDB_DATA outdata;
633         struct ctdb_marshall_buffer *reply;
634         struct ctdb_rec_data *rec;
635         int i;
636         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
637
638         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
639                                CONTROL_TIMEOUT(), &outdata);
640         if (ret != 0) {
641                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
642                 talloc_free(tmp_ctx);
643                 return -1;
644         }
645
646         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
647
648         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
649                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
650                 talloc_free(tmp_ctx);
651                 return -1;
652         }
653         
654         rec = (struct ctdb_rec_data *)&reply->data[0];
655         
656         for (i=0;
657              i<reply->count;
658              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
659                 TDB_DATA key, data;
660                 struct ctdb_ltdb_header *hdr;
661                 TDB_DATA existing;
662                 
663                 key.dptr = &rec->data[0];
664                 key.dsize = rec->keylen;
665                 data.dptr = &rec->data[key.dsize];
666                 data.dsize = rec->datalen;
667                 
668                 hdr = (struct ctdb_ltdb_header *)data.dptr;
669
670                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
671                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
672                         talloc_free(tmp_ctx);
673                         return -1;
674                 }
675
676                 /* fetch the existing record, if any */
677                 existing = tdb_fetch(recdb->tdb, key);
678                 
679                 if (existing.dptr != NULL) {
680                         struct ctdb_ltdb_header header;
681                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
682                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
683                                          (unsigned)existing.dsize, srcnode));
684                                 free(existing.dptr);
685                                 talloc_free(tmp_ctx);
686                                 return -1;
687                         }
688                         header = *(struct ctdb_ltdb_header *)existing.dptr;
689                         free(existing.dptr);
690                         if (!(header.rsn < hdr->rsn ||
691                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
692                                 continue;
693                         }
694                 }
695                 
696                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
697                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
698                         talloc_free(tmp_ctx);
699                         return -1;                              
700                 }
701         }
702
703         talloc_free(tmp_ctx);
704
705         return 0;
706 }
707
708
709 struct pull_seqnum_cbdata {
710         int failed;
711         uint32_t pnn;
712         uint64_t seqnum;
713 };
714
715 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
716 {
717         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
718         uint64_t seqnum;
719
720         if (cb_data->failed != 0) {
721                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
722                 return;
723         }
724
725         if (res != 0) {
726                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
727                 cb_data->failed = 1;
728                 return;
729         }
730
731         if (outdata.dsize != sizeof(uint64_t)) {
732                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
733                 cb_data->failed = -1;
734                 return;
735         }
736
737         seqnum = *((uint64_t *)outdata.dptr);
738
739         if (seqnum > cb_data->seqnum) {
740                 cb_data->seqnum = seqnum;
741                 cb_data->pnn = node_pnn;
742         }
743 }
744
745 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
746 {
747         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
748
749         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
750         cb_data->failed = 1;
751 }
752
753 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
754                                 struct ctdb_recoverd *rec, 
755                                 struct ctdb_node_map *nodemap, 
756                                 struct tdb_wrap *recdb, uint32_t dbid)
757 {
758         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
759         uint32_t *nodes;
760         TDB_DATA data;
761         uint32_t outdata[2];
762         struct pull_seqnum_cbdata *cb_data;
763
764         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
765
766         outdata[0] = dbid;
767         outdata[1] = 0;
768
769         data.dsize = sizeof(outdata);
770         data.dptr  = (uint8_t *)&outdata[0];
771
772         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
773         if (cb_data == NULL) {
774                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
775                 talloc_free(tmp_ctx);
776                 return -1;
777         }
778
779         cb_data->failed = 0;
780         cb_data->pnn    = -1;
781         cb_data->seqnum = 0;
782         
783         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
784         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
785                                         nodes, 0,
786                                         CONTROL_TIMEOUT(), false, data,
787                                         pull_seqnum_cb,
788                                         pull_seqnum_fail_cb,
789                                         cb_data) != 0) {
790                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
791
792                 talloc_free(tmp_ctx);
793                 return -1;
794         }
795
796         if (cb_data->failed != 0) {
797                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
798                 talloc_free(tmp_ctx);
799                 return -1;
800         }
801
802         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
803                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
804                 talloc_free(tmp_ctx);
805                 return -1;
806         }
807
808         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
809
810         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
811                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
812                 talloc_free(tmp_ctx);
813                 return -1;
814         }
815
816         talloc_free(tmp_ctx);
817         return 0;
818 }
819
820
821 /*
822   pull all the remote database contents into the recdb
823  */
824 static int pull_remote_database(struct ctdb_context *ctdb,
825                                 struct ctdb_recoverd *rec, 
826                                 struct ctdb_node_map *nodemap, 
827                                 struct tdb_wrap *recdb, uint32_t dbid,
828                                 bool persistent)
829 {
830         int j;
831
832         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
833                 int ret;
834                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
835                 if (ret == 0) {
836                         return 0;
837                 }
838         }
839
840         /* pull all records from all other nodes across onto this node
841            (this merges based on rsn)
842         */
843         for (j=0; j<nodemap->num; j++) {
844                 /* dont merge from nodes that are unavailable */
845                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
846                         continue;
847                 }
848                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
849                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
850                                  nodemap->nodes[j].pnn));
851                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
852                         return -1;
853                 }
854         }
855         
856         return 0;
857 }
858
859
860 /*
861   update flags on all active nodes
862  */
863 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
864 {
865         int ret;
866
867         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
868                 if (ret != 0) {
869                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
870                 return -1;
871         }
872
873         return 0;
874 }
875
876 /*
877   ensure all nodes have the same vnnmap we do
878  */
879 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
880                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
881 {
882         int j, ret;
883
884         /* push the new vnn map out to all the nodes */
885         for (j=0; j<nodemap->num; j++) {
886                 /* dont push to nodes that are unavailable */
887                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
888                         continue;
889                 }
890
891                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
892                 if (ret != 0) {
893                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
894                         return -1;
895                 }
896         }
897
898         return 0;
899 }
900
901
902 struct vacuum_info {
903         struct vacuum_info *next, *prev;
904         struct ctdb_recoverd *rec;
905         uint32_t srcnode;
906         struct ctdb_db_context *ctdb_db;
907         struct ctdb_marshall_buffer *recs;
908         struct ctdb_rec_data *r;
909 };
910
911 static void vacuum_fetch_next(struct vacuum_info *v);
912
913 /*
914   called when a vacuum fetch has completed - just free it and do the next one
915  */
916 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
917 {
918         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
919         talloc_free(state);
920         vacuum_fetch_next(v);
921 }
922
923
924 /*
925   process the next element from the vacuum list
926 */
927 static void vacuum_fetch_next(struct vacuum_info *v)
928 {
929         struct ctdb_call call;
930         struct ctdb_rec_data *r;
931
932         while (v->recs->count) {
933                 struct ctdb_client_call_state *state;
934                 TDB_DATA data;
935                 struct ctdb_ltdb_header *hdr;
936
937                 ZERO_STRUCT(call);
938                 call.call_id = CTDB_NULL_FUNC;
939                 call.flags = CTDB_IMMEDIATE_MIGRATION;
940                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
941
942                 r = v->r;
943                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
944                 v->recs->count--;
945
946                 call.key.dptr = &r->data[0];
947                 call.key.dsize = r->keylen;
948
949                 /* ensure we don't block this daemon - just skip a record if we can't get
950                    the chainlock */
951                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
952                         continue;
953                 }
954
955                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
956                 if (data.dptr == NULL) {
957                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
958                         continue;
959                 }
960
961                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
962                         free(data.dptr);
963                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
964                         continue;
965                 }
966                 
967                 hdr = (struct ctdb_ltdb_header *)data.dptr;
968                 if (hdr->dmaster == v->rec->ctdb->pnn) {
969                         /* its already local */
970                         free(data.dptr);
971                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
972                         continue;
973                 }
974
975                 free(data.dptr);
976
977                 state = ctdb_call_send(v->ctdb_db, &call);
978                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
979                 if (state == NULL) {
980                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
981                         talloc_free(v);
982                         return;
983                 }
984                 state->async.fn = vacuum_fetch_callback;
985                 state->async.private_data = v;
986                 return;
987         }
988
989         talloc_free(v);
990 }
991
992
993 /*
994   destroy a vacuum info structure
995  */
996 static int vacuum_info_destructor(struct vacuum_info *v)
997 {
998         DLIST_REMOVE(v->rec->vacuum_info, v);
999         return 0;
1000 }
1001
1002
1003 /*
1004   handler for vacuum fetch
1005 */
1006 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1007                                  TDB_DATA data, void *private_data)
1008 {
1009         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1010         struct ctdb_marshall_buffer *recs;
1011         int ret, i;
1012         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1013         const char *name;
1014         struct ctdb_dbid_map *dbmap=NULL;
1015         bool persistent = false;
1016         struct ctdb_db_context *ctdb_db;
1017         struct ctdb_rec_data *r;
1018         uint32_t srcnode;
1019         struct vacuum_info *v;
1020
1021         recs = (struct ctdb_marshall_buffer *)data.dptr;
1022         r = (struct ctdb_rec_data *)&recs->data[0];
1023
1024         if (recs->count == 0) {
1025                 talloc_free(tmp_ctx);
1026                 return;
1027         }
1028
1029         srcnode = r->reqid;
1030
1031         for (v=rec->vacuum_info;v;v=v->next) {
1032                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1033                         /* we're already working on records from this node */
1034                         talloc_free(tmp_ctx);
1035                         return;
1036                 }
1037         }
1038
1039         /* work out if the database is persistent */
1040         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1041         if (ret != 0) {
1042                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1043                 talloc_free(tmp_ctx);
1044                 return;
1045         }
1046
1047         for (i=0;i<dbmap->num;i++) {
1048                 if (dbmap->dbs[i].dbid == recs->db_id) {
1049                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1050                         break;
1051                 }
1052         }
1053         if (i == dbmap->num) {
1054                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1055                 talloc_free(tmp_ctx);
1056                 return;         
1057         }
1058
1059         /* find the name of this database */
1060         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1061                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1062                 talloc_free(tmp_ctx);
1063                 return;
1064         }
1065
1066         /* attach to it */
1067         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1068         if (ctdb_db == NULL) {
1069                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1070                 talloc_free(tmp_ctx);
1071                 return;
1072         }
1073
1074         v = talloc_zero(rec, struct vacuum_info);
1075         if (v == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1077                 talloc_free(tmp_ctx);
1078                 return;
1079         }
1080
1081         v->rec = rec;
1082         v->srcnode = srcnode;
1083         v->ctdb_db = ctdb_db;
1084         v->recs = talloc_memdup(v, recs, data.dsize);
1085         if (v->recs == NULL) {
1086                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1087                 talloc_free(v);
1088                 talloc_free(tmp_ctx);
1089                 return;         
1090         }
1091         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1092
1093         DLIST_ADD(rec->vacuum_info, v);
1094
1095         talloc_set_destructor(v, vacuum_info_destructor);
1096
1097         vacuum_fetch_next(v);
1098         talloc_free(tmp_ctx);
1099 }
1100
1101
1102 /*
1103   called when ctdb_wait_timeout should finish
1104  */
1105 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1106                               struct timeval yt, void *p)
1107 {
1108         uint32_t *timed_out = (uint32_t *)p;
1109         (*timed_out) = 1;
1110 }
1111
1112 /*
1113   wait for a given number of seconds
1114  */
1115 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1116 {
1117         uint32_t timed_out = 0;
1118         time_t usecs = (secs - (time_t)secs) * 1000000;
1119         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1120         while (!timed_out) {
1121                 event_loop_once(ctdb->ev);
1122         }
1123 }
1124
1125 /*
1126   called when an election times out (ends)
1127  */
1128 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1129                                   struct timeval t, void *p)
1130 {
1131         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1132         rec->election_timeout = NULL;
1133         fast_start = false;
1134
1135         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1136 }
1137
1138
1139 /*
1140   wait for an election to finish. It finished election_timeout seconds after
1141   the last election packet is received
1142  */
1143 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1144 {
1145         struct ctdb_context *ctdb = rec->ctdb;
1146         while (rec->election_timeout) {
1147                 event_loop_once(ctdb->ev);
1148         }
1149 }
1150
1151 /*
1152   Update our local flags from all remote connected nodes. 
1153   This is only run when we are or we belive we are the recovery master
1154  */
1155 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1156 {
1157         int j;
1158         struct ctdb_context *ctdb = rec->ctdb;
1159         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1160
1161         /* get the nodemap for all active remote nodes and verify
1162            they are the same as for this node
1163          */
1164         for (j=0; j<nodemap->num; j++) {
1165                 struct ctdb_node_map *remote_nodemap=NULL;
1166                 int ret;
1167
1168                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1169                         continue;
1170                 }
1171                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1172                         continue;
1173                 }
1174
1175                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1176                                            mem_ctx, &remote_nodemap);
1177                 if (ret != 0) {
1178                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1179                                   nodemap->nodes[j].pnn));
1180                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1181                         talloc_free(mem_ctx);
1182                         return MONITOR_FAILED;
1183                 }
1184                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1185                         /* We should tell our daemon about this so it
1186                            updates its flags or else we will log the same 
1187                            message again in the next iteration of recovery.
1188                            Since we are the recovery master we can just as
1189                            well update the flags on all nodes.
1190                         */
1191                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1192                         if (ret != 0) {
1193                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1194                                 return -1;
1195                         }
1196
1197                         /* Update our local copy of the flags in the recovery
1198                            daemon.
1199                         */
1200                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1201                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1202                                  nodemap->nodes[j].flags));
1203                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1204                 }
1205                 talloc_free(remote_nodemap);
1206         }
1207         talloc_free(mem_ctx);
1208         return MONITOR_OK;
1209 }
1210
1211
1212 /* Create a new random generation ip. 
1213    The generation id can not be the INVALID_GENERATION id
1214 */
1215 static uint32_t new_generation(void)
1216 {
1217         uint32_t generation;
1218
1219         while (1) {
1220                 generation = random();
1221
1222                 if (generation != INVALID_GENERATION) {
1223                         break;
1224                 }
1225         }
1226
1227         return generation;
1228 }
1229
1230
1231 /*
1232   create a temporary working database
1233  */
1234 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1235 {
1236         char *name;
1237         struct tdb_wrap *recdb;
1238         unsigned tdb_flags;
1239
1240         /* open up the temporary recovery database */
1241         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1242                                ctdb->db_directory_state,
1243                                ctdb->pnn);
1244         if (name == NULL) {
1245                 return NULL;
1246         }
1247         unlink(name);
1248
1249         tdb_flags = TDB_NOLOCK;
1250         if (ctdb->valgrinding) {
1251                 tdb_flags |= TDB_NOMMAP;
1252         }
1253         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1254
1255         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1256                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1257         if (recdb == NULL) {
1258                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1259         }
1260
1261         talloc_free(name);
1262
1263         return recdb;
1264 }
1265
1266
1267 /* 
1268    a traverse function for pulling all relevant records from recdb
1269  */
1270 struct recdb_data {
1271         struct ctdb_context *ctdb;
1272         struct ctdb_marshall_buffer *recdata;
1273         uint32_t len;
1274         uint32_t allocated_len;
1275         bool failed;
1276         bool persistent;
1277 };
1278
1279 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1280 {
1281         struct recdb_data *params = (struct recdb_data *)p;
1282         struct ctdb_rec_data *rec;
1283         struct ctdb_ltdb_header *hdr;
1284
1285         /*
1286          * skip empty records - but NOT for persistent databases:
1287          *
1288          * The record-by-record mode of recovery deletes empty records.
1289          * For persistent databases, this can lead to data corruption
1290          * by deleting records that should be there:
1291          *
1292          * - Assume the cluster has been running for a while.
1293          *
1294          * - A record R in a persistent database has been created and
1295          *   deleted a couple of times, the last operation being deletion,
1296          *   leaving an empty record with a high RSN, say 10.
1297          *
1298          * - Now a node N is turned off.
1299          *
1300          * - This leaves the local database copy of D on N with the empty
1301          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1302          *   the copy of record R.
1303          *
1304          * - Now the record is created again while node N is turned off.
1305          *   This creates R with RSN = 1 on all nodes except for N.
1306          *
1307          * - Now node N is turned on again. The following recovery will chose
1308          *   the older empty copy of R due to RSN 10 > RSN 1.
1309          *
1310          * ==> Hence the record is gone after the recovery.
1311          *
1312          * On databases like Samba's registry, this can damage the higher-level
1313          * data structures built from the various tdb-level records.
1314          */
1315         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1316                 return 0;
1317         }
1318
1319         /* update the dmaster field to point to us */
1320         hdr = (struct ctdb_ltdb_header *)data.dptr;
1321         if (!params->persistent) {
1322                 hdr->dmaster = params->ctdb->pnn;
1323                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1324         }
1325
1326         /* add the record to the blob ready to send to the nodes */
1327         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1328         if (rec == NULL) {
1329                 params->failed = true;
1330                 return -1;
1331         }
1332         if (params->len + rec->length >= params->allocated_len) {
1333                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1334                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1335         }
1336         if (params->recdata == NULL) {
1337                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1338                          rec->length + params->len));
1339                 params->failed = true;
1340                 return -1;
1341         }
1342         params->recdata->count++;
1343         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1344         params->len += rec->length;
1345         talloc_free(rec);
1346
1347         return 0;
1348 }
1349
1350 /*
1351   push the recdb database out to all nodes
1352  */
1353 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1354                                bool persistent,
1355                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1356 {
1357         struct recdb_data params;
1358         struct ctdb_marshall_buffer *recdata;
1359         TDB_DATA outdata;
1360         TALLOC_CTX *tmp_ctx;
1361         uint32_t *nodes;
1362
1363         tmp_ctx = talloc_new(ctdb);
1364         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1365
1366         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1367         CTDB_NO_MEMORY(ctdb, recdata);
1368
1369         recdata->db_id = dbid;
1370
1371         params.ctdb = ctdb;
1372         params.recdata = recdata;
1373         params.len = offsetof(struct ctdb_marshall_buffer, data);
1374         params.allocated_len = params.len;
1375         params.failed = false;
1376         params.persistent = persistent;
1377
1378         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1379                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1380                 talloc_free(params.recdata);
1381                 talloc_free(tmp_ctx);
1382                 return -1;
1383         }
1384
1385         if (params.failed) {
1386                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1387                 talloc_free(params.recdata);
1388                 talloc_free(tmp_ctx);
1389                 return -1;              
1390         }
1391
1392         recdata = params.recdata;
1393
1394         outdata.dptr = (void *)recdata;
1395         outdata.dsize = params.len;
1396
1397         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1398         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1399                                         nodes, 0,
1400                                         CONTROL_TIMEOUT(), false, outdata,
1401                                         NULL, NULL,
1402                                         NULL) != 0) {
1403                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1404                 talloc_free(recdata);
1405                 talloc_free(tmp_ctx);
1406                 return -1;
1407         }
1408
1409         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1410                   dbid, recdata->count));
1411
1412         talloc_free(recdata);
1413         talloc_free(tmp_ctx);
1414
1415         return 0;
1416 }
1417
1418
1419 /*
1420   go through a full recovery on one database 
1421  */
1422 static int recover_database(struct ctdb_recoverd *rec, 
1423                             TALLOC_CTX *mem_ctx,
1424                             uint32_t dbid,
1425                             bool persistent,
1426                             uint32_t pnn, 
1427                             struct ctdb_node_map *nodemap,
1428                             uint32_t transaction_id)
1429 {
1430         struct tdb_wrap *recdb;
1431         int ret;
1432         struct ctdb_context *ctdb = rec->ctdb;
1433         TDB_DATA data;
1434         struct ctdb_control_wipe_database w;
1435         uint32_t *nodes;
1436
1437         recdb = create_recdb(ctdb, mem_ctx);
1438         if (recdb == NULL) {
1439                 return -1;
1440         }
1441
1442         /* pull all remote databases onto the recdb */
1443         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1446                 return -1;
1447         }
1448
1449         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1450
1451         /* wipe all the remote databases. This is safe as we are in a transaction */
1452         w.db_id = dbid;
1453         w.transaction_id = transaction_id;
1454
1455         data.dptr = (void *)&w;
1456         data.dsize = sizeof(w);
1457
1458         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1459         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1460                                         nodes, 0,
1461                                         CONTROL_TIMEOUT(), false, data,
1462                                         NULL, NULL,
1463                                         NULL) != 0) {
1464                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1465                 talloc_free(recdb);
1466                 return -1;
1467         }
1468         
1469         /* push out the correct database. This sets the dmaster and skips 
1470            the empty records */
1471         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1472         if (ret != 0) {
1473                 talloc_free(recdb);
1474                 return -1;
1475         }
1476
1477         /* all done with this database */
1478         talloc_free(recdb);
1479
1480         return 0;
1481 }
1482
1483 /*
1484   reload the nodes file 
1485 */
1486 static void reload_nodes_file(struct ctdb_context *ctdb)
1487 {
1488         ctdb->nodes = NULL;
1489         ctdb_load_nodes_file(ctdb);
1490 }
1491
1492 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1493                                          struct ctdb_recoverd *rec,
1494                                          struct ctdb_node_map *nodemap,
1495                                          uint32_t *culprit)
1496 {
1497         int j;
1498         int ret;
1499
1500         if (ctdb->num_nodes != nodemap->num) {
1501                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1502                                   ctdb->num_nodes, nodemap->num));
1503                 if (culprit) {
1504                         *culprit = ctdb->pnn;
1505                 }
1506                 return -1;
1507         }
1508
1509         for (j=0; j<nodemap->num; j++) {
1510                 /* For readability */
1511                 struct ctdb_node *node = ctdb->nodes[j];
1512
1513                 /* release any existing data */
1514                 if (node->known_public_ips) {
1515                         talloc_free(node->known_public_ips);
1516                         node->known_public_ips = NULL;
1517                 }
1518                 if (node->available_public_ips) {
1519                         talloc_free(node->available_public_ips);
1520                         node->available_public_ips = NULL;
1521                 }
1522
1523                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1524                         continue;
1525                 }
1526
1527                 /* Retrieve the list of known public IPs from the node */
1528                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1529                                         CONTROL_TIMEOUT(),
1530                                         node->pnn,
1531                                         ctdb->nodes,
1532                                         0,
1533                                         &node->known_public_ips);
1534                 if (ret != 0) {
1535                         DEBUG(DEBUG_ERR,
1536                               ("Failed to read known public IPs from node: %u\n",
1537                                node->pnn));
1538                         if (culprit) {
1539                                 *culprit = node->pnn;
1540                         }
1541                         return -1;
1542                 }
1543
1544                 if (ctdb->do_checkpublicip &&
1545                     rec->takeover_runs_disable_ctx == NULL &&
1546                     verify_remote_ip_allocation(ctdb,
1547                                                  node->known_public_ips,
1548                                                  node->pnn)) {
1549                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1550                         rec->need_takeover_run = true;
1551                 }
1552
1553                 /* Retrieve the list of available public IPs from the node */
1554                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1555                                         CONTROL_TIMEOUT(),
1556                                         node->pnn,
1557                                         ctdb->nodes,
1558                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1559                                         &node->available_public_ips);
1560                 if (ret != 0) {
1561                         DEBUG(DEBUG_ERR,
1562                               ("Failed to read available public IPs from node: %u\n",
1563                                node->pnn));
1564                         if (culprit) {
1565                                 *culprit = node->pnn;
1566                         }
1567                         return -1;
1568                 }
1569         }
1570
1571         return 0;
1572 }
1573
1574 /* when we start a recovery, make sure all nodes use the same reclock file
1575    setting
1576 */
1577 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1578 {
1579         struct ctdb_context *ctdb = rec->ctdb;
1580         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1581         TDB_DATA data;
1582         uint32_t *nodes;
1583
1584         if (ctdb->recovery_lock_file == NULL) {
1585                 data.dptr  = NULL;
1586                 data.dsize = 0;
1587         } else {
1588                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1589                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1590         }
1591
1592         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1593         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1594                                         nodes, 0,
1595                                         CONTROL_TIMEOUT(),
1596                                         false, data,
1597                                         NULL, NULL,
1598                                         rec) != 0) {
1599                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1600                 talloc_free(tmp_ctx);
1601                 return -1;
1602         }
1603
1604         talloc_free(tmp_ctx);
1605         return 0;
1606 }
1607
1608
1609 /*
1610  * this callback is called for every node that failed to execute ctdb_takeover_run()
1611  * and set flag to re-run takeover run.
1612  */
1613 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1614 {
1615         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1616
1617         if (callback_data != NULL) {
1618                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1619
1620                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1621
1622                 ctdb_set_culprit(rec, node_pnn);
1623         }
1624 }
1625
1626
1627 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1628 {
1629         struct ctdb_context *ctdb = rec->ctdb;
1630         int i;
1631         struct ctdb_banning_state *ban_state;
1632
1633         *self_ban = false;
1634         for (i=0; i<ctdb->num_nodes; i++) {
1635                 if (ctdb->nodes[i]->ban_state == NULL) {
1636                         continue;
1637                 }
1638                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1639                 if (ban_state->count < 2*ctdb->num_nodes) {
1640                         continue;
1641                 }
1642
1643                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1644                         ctdb->nodes[i]->pnn, ban_state->count,
1645                         ctdb->tunable.recovery_ban_period));
1646                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1647                 ban_state->count = 0;
1648
1649                 /* Banning ourself? */
1650                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1651                         *self_ban = true;
1652                 }
1653         }
1654 }
1655
1656 static bool do_takeover_run(struct ctdb_recoverd *rec,
1657                             struct ctdb_node_map *nodemap,
1658                             bool banning_credits_on_fail)
1659 {
1660         uint32_t *nodes = NULL;
1661         struct srvid_request dtr;
1662         TDB_DATA data;
1663         int i;
1664         int ret;
1665         bool ok;
1666
1667         if (rec->takeover_run_in_progress) {
1668                 DEBUG(DEBUG_ERR, (__location__
1669                                   " takeover run already in progress \n"));
1670                 ok = false;
1671                 goto done;
1672         }
1673
1674         rec->takeover_run_in_progress = true;
1675
1676         /* If takeover runs are in disabled then fail... */
1677         if (rec->takeover_runs_disable_ctx != NULL) {
1678                 DEBUG(DEBUG_ERR,
1679                       ("Takeover runs are disabled so refusing to run one\n"));
1680                 ok = false;
1681                 goto done;
1682         }
1683
1684         /* Disable IP checks (takeover runs, really) on other nodes
1685          * while doing this takeover run.  This will stop those other
1686          * nodes from triggering takeover runs when think they should
1687          * be hosting an IP but it isn't yet on an interface.  Don't
1688          * wait for replies since a failure here might cause some
1689          * noise in the logs but will not actually cause a problem.
1690          */
1691         dtr.srvid = 0; /* No reply */
1692         dtr.pnn = -1;
1693
1694         data.dptr  = (uint8_t*)&dtr;
1695         data.dsize = sizeof(dtr);
1696
1697         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1698
1699         /* Disable for 60 seconds.  This can be a tunable later if
1700          * necessary.
1701          */
1702         dtr.data = 60;
1703         for (i = 0; i < talloc_array_length(nodes); i++) {
1704                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1705                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1706                                              data) != 0) {
1707                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1708                 }
1709         }
1710
1711         ret = ctdb_takeover_run(rec->ctdb, nodemap, takeover_fail_callback,
1712                                 banning_credits_on_fail ? rec : NULL);
1713
1714         /* Reenable takeover runs and IP checks on other nodes */
1715         dtr.data = 0;
1716         for (i = 0; i < talloc_array_length(nodes); i++) {
1717                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1718                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1719                                              data) != 0) {
1720                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1721                 }
1722         }
1723
1724         if (ret != 0) {
1725                 DEBUG(DEBUG_ERR, ("IP reallocation failed\n"));
1726                 ok = false;
1727                 goto done;
1728         }
1729
1730         ok = true;
1731 done:
1732         rec->need_takeover_run = !ok;
1733         talloc_free(nodes);
1734         rec->takeover_run_in_progress = false;
1735         return ok;
1736 }
1737
1738
1739 /*
1740   we are the recmaster, and recovery is needed - start a recovery run
1741  */
1742 static int do_recovery(struct ctdb_recoverd *rec, 
1743                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1744                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1745 {
1746         struct ctdb_context *ctdb = rec->ctdb;
1747         int i, j, ret;
1748         uint32_t generation;
1749         struct ctdb_dbid_map *dbmap;
1750         TDB_DATA data;
1751         uint32_t *nodes;
1752         struct timeval start_time;
1753         uint32_t culprit = (uint32_t)-1;
1754         bool self_ban;
1755
1756         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1757
1758         /* if recovery fails, force it again */
1759         rec->need_recovery = true;
1760
1761         ban_misbehaving_nodes(rec, &self_ban);
1762         if (self_ban) {
1763                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1764                 return -1;
1765         }
1766
1767         if (ctdb->tunable.verify_recovery_lock != 0) {
1768                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1769                 start_time = timeval_current();
1770                 if (!ctdb_recovery_lock(ctdb, true)) {
1771                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1772                                          "and ban ourself for %u seconds\n",
1773                                          ctdb->tunable.recovery_ban_period));
1774                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1775                         return -1;
1776                 }
1777                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1778                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1779         }
1780
1781         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1782
1783         /* get a list of all databases */
1784         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1785         if (ret != 0) {
1786                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1787                 return -1;
1788         }
1789
1790         /* we do the db creation before we set the recovery mode, so the freeze happens
1791            on all databases we will be dealing with. */
1792
1793         /* verify that we have all the databases any other node has */
1794         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1795         if (ret != 0) {
1796                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1797                 return -1;
1798         }
1799
1800         /* verify that all other nodes have all our databases */
1801         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1802         if (ret != 0) {
1803                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1804                 return -1;
1805         }
1806         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1807
1808         /* update the database priority for all remote databases */
1809         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1810         if (ret != 0) {
1811                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1812         }
1813         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1814
1815
1816         /* update all other nodes to use the same setting for reclock files
1817            as the local recovery master.
1818         */
1819         sync_recovery_lock_file_across_cluster(rec);
1820
1821         /* set recovery mode to active on all nodes */
1822         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1823         if (ret != 0) {
1824                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1825                 return -1;
1826         }
1827
1828         /* execute the "startrecovery" event script on all nodes */
1829         ret = run_startrecovery_eventscript(rec, nodemap);
1830         if (ret!=0) {
1831                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1832                 return -1;
1833         }
1834
1835         /*
1836           update all nodes to have the same flags that we have
1837          */
1838         for (i=0;i<nodemap->num;i++) {
1839                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1840                         continue;
1841                 }
1842
1843                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1844                 if (ret != 0) {
1845                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1846                         return -1;
1847                 }
1848         }
1849
1850         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1851
1852         /* pick a new generation number */
1853         generation = new_generation();
1854
1855         /* change the vnnmap on this node to use the new generation 
1856            number but not on any other nodes.
1857            this guarantees that if we abort the recovery prematurely
1858            for some reason (a node stops responding?)
1859            that we can just return immediately and we will reenter
1860            recovery shortly again.
1861            I.e. we deliberately leave the cluster with an inconsistent
1862            generation id to allow us to abort recovery at any stage and
1863            just restart it from scratch.
1864          */
1865         vnnmap->generation = generation;
1866         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1867         if (ret != 0) {
1868                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1869                 return -1;
1870         }
1871
1872         data.dptr = (void *)&generation;
1873         data.dsize = sizeof(uint32_t);
1874
1875         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1876         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1877                                         nodes, 0,
1878                                         CONTROL_TIMEOUT(), false, data,
1879                                         NULL,
1880                                         transaction_start_fail_callback,
1881                                         rec) != 0) {
1882                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1883                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1884                                         nodes, 0,
1885                                         CONTROL_TIMEOUT(), false, tdb_null,
1886                                         NULL,
1887                                         NULL,
1888                                         NULL) != 0) {
1889                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1890                 }
1891                 return -1;
1892         }
1893
1894         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1895
1896         for (i=0;i<dbmap->num;i++) {
1897                 ret = recover_database(rec, mem_ctx,
1898                                        dbmap->dbs[i].dbid,
1899                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1900                                        pnn, nodemap, generation);
1901                 if (ret != 0) {
1902                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1903                         return -1;
1904                 }
1905         }
1906
1907         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1908
1909         /* commit all the changes */
1910         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1911                                         nodes, 0,
1912                                         CONTROL_TIMEOUT(), false, data,
1913                                         NULL, NULL,
1914                                         NULL) != 0) {
1915                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1916                 return -1;
1917         }
1918
1919         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1920         
1921
1922         /* update the capabilities for all nodes */
1923         ret = update_capabilities(ctdb, nodemap);
1924         if (ret!=0) {
1925                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1926                 return -1;
1927         }
1928
1929         /* build a new vnn map with all the currently active and
1930            unbanned nodes */
1931         generation = new_generation();
1932         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1933         CTDB_NO_MEMORY(ctdb, vnnmap);
1934         vnnmap->generation = generation;
1935         vnnmap->size = 0;
1936         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1937         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1938         for (i=j=0;i<nodemap->num;i++) {
1939                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1940                         continue;
1941                 }
1942                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1943                         /* this node can not be an lmaster */
1944                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1945                         continue;
1946                 }
1947
1948                 vnnmap->size++;
1949                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1950                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1951                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1952
1953         }
1954         if (vnnmap->size == 0) {
1955                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1956                 vnnmap->size++;
1957                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1958                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1959                 vnnmap->map[0] = pnn;
1960         }       
1961
1962         /* update to the new vnnmap on all nodes */
1963         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1964         if (ret != 0) {
1965                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1966                 return -1;
1967         }
1968
1969         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1970
1971         /* update recmaster to point to us for all nodes */
1972         ret = set_recovery_master(ctdb, nodemap, pnn);
1973         if (ret!=0) {
1974                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1975                 return -1;
1976         }
1977
1978         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1979
1980         /*
1981           update all nodes to have the same flags that we have
1982          */
1983         for (i=0;i<nodemap->num;i++) {
1984                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1985                         continue;
1986                 }
1987
1988                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1989                 if (ret != 0) {
1990                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1991                         return -1;
1992                 }
1993         }
1994
1995         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1996
1997         /* disable recovery mode */
1998         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1999         if (ret != 0) {
2000                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2001                 return -1;
2002         }
2003
2004         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2005
2006         /* Fetch known/available public IPs from each active node */
2007         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2008         if (ret != 0) {
2009                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2010                                  culprit));
2011                 rec->need_takeover_run = true;
2012                 return -1;
2013         }
2014
2015         do_takeover_run(rec, nodemap, false);
2016
2017         /* execute the "recovered" event script on all nodes */
2018         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2019         if (ret!=0) {
2020                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2021                 return -1;
2022         }
2023
2024         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2025
2026         /* send a message to all clients telling them that the cluster 
2027            has been reconfigured */
2028         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
2029
2030         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2031
2032         rec->need_recovery = false;
2033
2034         /* we managed to complete a full recovery, make sure to forgive
2035            any past sins by the nodes that could now participate in the
2036            recovery.
2037         */
2038         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2039         for (i=0;i<nodemap->num;i++) {
2040                 struct ctdb_banning_state *ban_state;
2041
2042                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2043                         continue;
2044                 }
2045
2046                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2047                 if (ban_state == NULL) {
2048                         continue;
2049                 }
2050
2051                 ban_state->count = 0;
2052         }
2053
2054
2055         /* We just finished a recovery successfully. 
2056            We now wait for rerecovery_timeout before we allow 
2057            another recovery to take place.
2058         */
2059         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2060         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2061         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2062
2063         return 0;
2064 }
2065
2066
2067 /*
2068   elections are won by first checking the number of connected nodes, then
2069   the priority time, then the pnn
2070  */
2071 struct election_message {
2072         uint32_t num_connected;
2073         struct timeval priority_time;
2074         uint32_t pnn;
2075         uint32_t node_flags;
2076 };
2077
2078 /*
2079   form this nodes election data
2080  */
2081 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2082 {
2083         int ret, i;
2084         struct ctdb_node_map *nodemap;
2085         struct ctdb_context *ctdb = rec->ctdb;
2086
2087         ZERO_STRUCTP(em);
2088
2089         em->pnn = rec->ctdb->pnn;
2090         em->priority_time = rec->priority_time;
2091
2092         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2093         if (ret != 0) {
2094                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
2095                 return;
2096         }
2097
2098         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2099         em->node_flags = rec->node_flags;
2100
2101         for (i=0;i<nodemap->num;i++) {
2102                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2103                         em->num_connected++;
2104                 }
2105         }
2106
2107         /* we shouldnt try to win this election if we cant be a recmaster */
2108         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2109                 em->num_connected = 0;
2110                 em->priority_time = timeval_current();
2111         }
2112
2113         talloc_free(nodemap);
2114 }
2115
2116 /*
2117   see if the given election data wins
2118  */
2119 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2120 {
2121         struct election_message myem;
2122         int cmp = 0;
2123
2124         ctdb_election_data(rec, &myem);
2125
2126         /* we cant win if we dont have the recmaster capability */
2127         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2128                 return false;
2129         }
2130
2131         /* we cant win if we are banned */
2132         if (rec->node_flags & NODE_FLAGS_BANNED) {
2133                 return false;
2134         }
2135
2136         /* we cant win if we are stopped */
2137         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2138                 return false;
2139         }
2140
2141         /* we will automatically win if the other node is banned */
2142         if (em->node_flags & NODE_FLAGS_BANNED) {
2143                 return true;
2144         }
2145
2146         /* we will automatically win if the other node is banned */
2147         if (em->node_flags & NODE_FLAGS_STOPPED) {
2148                 return true;
2149         }
2150
2151         /* try to use the most connected node */
2152         if (cmp == 0) {
2153                 cmp = (int)myem.num_connected - (int)em->num_connected;
2154         }
2155
2156         /* then the longest running node */
2157         if (cmp == 0) {
2158                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2159         }
2160
2161         if (cmp == 0) {
2162                 cmp = (int)myem.pnn - (int)em->pnn;
2163         }
2164
2165         return cmp > 0;
2166 }
2167
2168 /*
2169   send out an election request
2170  */
2171 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2172 {
2173         int ret;
2174         TDB_DATA election_data;
2175         struct election_message emsg;
2176         uint64_t srvid;
2177         struct ctdb_context *ctdb = rec->ctdb;
2178
2179         srvid = CTDB_SRVID_RECOVERY;
2180
2181         ctdb_election_data(rec, &emsg);
2182
2183         election_data.dsize = sizeof(struct election_message);
2184         election_data.dptr  = (unsigned char *)&emsg;
2185
2186
2187         /* send an election message to all active nodes */
2188         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2189         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2190
2191
2192         /* A new node that is already frozen has entered the cluster.
2193            The existing nodes are not frozen and dont need to be frozen
2194            until the election has ended and we start the actual recovery
2195         */
2196         if (update_recmaster == true) {
2197                 /* first we assume we will win the election and set 
2198                    recoverymaster to be ourself on the current node
2199                  */
2200                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2201                 if (ret != 0) {
2202                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2203                         return -1;
2204                 }
2205         }
2206
2207
2208         return 0;
2209 }
2210
2211 /*
2212   this function will unban all nodes in the cluster
2213 */
2214 static void unban_all_nodes(struct ctdb_context *ctdb)
2215 {
2216         int ret, i;
2217         struct ctdb_node_map *nodemap;
2218         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2219         
2220         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2221         if (ret != 0) {
2222                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2223                 return;
2224         }
2225
2226         for (i=0;i<nodemap->num;i++) {
2227                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2228                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2229                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2230                 }
2231         }
2232
2233         talloc_free(tmp_ctx);
2234 }
2235
2236
2237 /*
2238   we think we are winning the election - send a broadcast election request
2239  */
2240 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2241 {
2242         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2243         int ret;
2244
2245         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2246         if (ret != 0) {
2247                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2248         }
2249
2250         talloc_free(rec->send_election_te);
2251         rec->send_election_te = NULL;
2252 }
2253
2254 /*
2255   handler for memory dumps
2256 */
2257 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2258                              TDB_DATA data, void *private_data)
2259 {
2260         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2261         TDB_DATA *dump;
2262         int ret;
2263         struct srvid_request *rd;
2264
2265         if (data.dsize != sizeof(struct srvid_request)) {
2266                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2267                 talloc_free(tmp_ctx);
2268                 return;
2269         }
2270         rd = (struct srvid_request *)data.dptr;
2271
2272         dump = talloc_zero(tmp_ctx, TDB_DATA);
2273         if (dump == NULL) {
2274                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2275                 talloc_free(tmp_ctx);
2276                 return;
2277         }
2278         ret = ctdb_dump_memory(ctdb, dump);
2279         if (ret != 0) {
2280                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2281                 talloc_free(tmp_ctx);
2282                 return;
2283         }
2284
2285 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2286
2287         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2288         if (ret != 0) {
2289                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2290                 talloc_free(tmp_ctx);
2291                 return;
2292         }
2293
2294         talloc_free(tmp_ctx);
2295 }
2296
2297 /*
2298   handler for getlog
2299 */
2300 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2301                            TDB_DATA data, void *private_data)
2302 {
2303         struct ctdb_get_log_addr *log_addr;
2304         pid_t child;
2305
2306         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2307                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2308                 return;
2309         }
2310         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2311
2312         child = ctdb_fork_no_free_ringbuffer(ctdb);
2313         if (child == (pid_t)-1) {
2314                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2315                 return;
2316         }
2317
2318         if (child == 0) {
2319                 ctdb_set_process_name("ctdb_rec_log_collector");
2320                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2321                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2322                         _exit(1);
2323                 }
2324                 ctdb_collect_log(ctdb, log_addr);
2325                 _exit(0);
2326         }
2327 }
2328
2329 /*
2330   handler for clearlog
2331 */
2332 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2333                              TDB_DATA data, void *private_data)
2334 {
2335         ctdb_clear_log(ctdb);
2336 }
2337
2338 /*
2339   handler for reload_nodes
2340 */
2341 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2342                              TDB_DATA data, void *private_data)
2343 {
2344         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2345
2346         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2347
2348         reload_nodes_file(rec->ctdb);
2349 }
2350
2351
2352 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2353                                   struct timeval t, void *p)
2354 {
2355         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2356
2357         DEBUG(DEBUG_NOTICE,
2358               ("Rebalance all nodes that have had ip assignment changes.\n"));
2359
2360         do_takeover_run(rec, rec->nodemap, false);
2361
2362         talloc_free(rec->deferred_rebalance_ctx);
2363         rec->deferred_rebalance_ctx = NULL;
2364 }
2365
2366         
2367 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2368                              TDB_DATA data, void *private_data)
2369 {
2370         uint32_t pnn;
2371         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2372
2373         if (data.dsize != sizeof(uint32_t)) {
2374                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2375                 return;
2376         }
2377
2378         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2379                 return;
2380         }
2381
2382         pnn = *(uint32_t *)&data.dptr[0];
2383
2384         lcp2_forcerebalance(ctdb, pnn);
2385         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2386
2387         if (rec->deferred_rebalance_ctx != NULL) {
2388                 talloc_free(rec->deferred_rebalance_ctx);
2389         }
2390         rec->deferred_rebalance_ctx = talloc_new(rec);
2391         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2392                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2393                         ctdb_rebalance_timeout, rec);
2394 }
2395
2396
2397
2398 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2399                              TDB_DATA data, void *private_data)
2400 {
2401         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2402         struct ctdb_public_ip *ip;
2403
2404         if (rec->recmaster != rec->ctdb->pnn) {
2405                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2406                 return;
2407         }
2408
2409         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2410                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2411                 return;
2412         }
2413
2414         ip = (struct ctdb_public_ip *)data.dptr;
2415
2416         update_ip_assignment_tree(rec->ctdb, ip);
2417 }
2418
2419
2420 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2421 {
2422         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2423 }
2424
2425 static void reenable_takeover_runs(struct event_context *ev,
2426                                    struct timed_event *te,
2427                                    struct timeval yt, void *p)
2428 {
2429         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2430
2431         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2432         clear_takeover_runs_disable(rec);
2433 }
2434
2435 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2436                                           uint64_t srvid, TDB_DATA data,
2437                                           void *private_data)
2438 {
2439         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2440                                                     struct ctdb_recoverd);
2441         struct srvid_request *r;
2442         uint32_t timeout;
2443         TDB_DATA result;
2444         int32_t ret = 0;
2445
2446         /* Validate input data */
2447         if (data.dsize != sizeof(struct srvid_request)) {
2448                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2449                                  "expecting %lu\n", (long unsigned)data.dsize,
2450                                  (long unsigned)sizeof(struct srvid_request)));
2451                 ret = -EINVAL;
2452                 goto done;
2453         }
2454         if (data.dptr == NULL) {
2455                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2456                 ret = -EINVAL;
2457                 goto done;
2458         }
2459
2460         r = (struct srvid_request *)data.dptr;
2461         timeout = r->data;
2462
2463         if (timeout == 0) {
2464                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2465                 clear_takeover_runs_disable(rec);
2466                 ret = ctdb_get_pnn(ctdb);
2467                 goto done;
2468         }
2469
2470         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
2471                 DEBUG(DEBUG_ERR,
2472                       ("Refusing to disable takeover runs on inactive node\n"));
2473                 ret = -EHOSTDOWN;
2474                 goto done;
2475         }
2476
2477         if (rec->takeover_run_in_progress) {
2478                 DEBUG(DEBUG_ERR,
2479                       ("Unable to disable takeover runs - in progress\n"));
2480                 ret = -EAGAIN;
2481                 goto done;
2482         }
2483
2484         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2485
2486         /* Clear any old timers */
2487         clear_takeover_runs_disable(rec);
2488
2489         /* When this is non-NULL it indicates that takeover runs are
2490          * disabled.  This context also holds the timeout timer.
2491          */
2492         rec->takeover_runs_disable_ctx = talloc_new(rec);
2493         if (rec->takeover_runs_disable_ctx == NULL) {
2494                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2495                 ret = -ENOMEM;
2496                 goto done;
2497         }
2498
2499         /* Arrange for the timeout to occur */
2500         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2501                         timeval_current_ofs(timeout, 0),
2502                         reenable_takeover_runs,
2503                         rec);
2504
2505         /* Returning our PNN tells the caller that we succeeded */
2506         ret = ctdb_get_pnn(ctdb);
2507 done:
2508         result.dsize = sizeof(int32_t);
2509         result.dptr  = (uint8_t *)&ret;
2510         srvid_request_reply(ctdb, r, result);
2511 }
2512
2513 /* Backward compatibility for this SRVID - call
2514  * disable_takeover_runs_handler() instead
2515  */
2516 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2517                                      TDB_DATA data, void *private_data)
2518 {
2519         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2520                                                     struct ctdb_recoverd);
2521         TDB_DATA data2;
2522         struct srvid_request *req;
2523
2524         if (data.dsize != sizeof(uint32_t)) {
2525                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2526                                  "expecting %lu\n", (long unsigned)data.dsize,
2527                                  (long unsigned)sizeof(uint32_t)));
2528                 return;
2529         }
2530         if (data.dptr == NULL) {
2531                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2532                 return;
2533         }
2534
2535         req = talloc(ctdb, struct srvid_request);
2536         CTDB_NO_MEMORY_VOID(ctdb, req);
2537
2538         req->srvid = 0; /* No reply */
2539         req->pnn = -1;
2540         req->data = *((uint32_t *)data.dptr); /* Timeout */
2541
2542         data2.dsize = sizeof(*req);
2543         data2.dptr = (uint8_t *)req;
2544
2545         disable_takeover_runs_handler(rec->ctdb,
2546                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2547                                       data2, rec);
2548 }
2549
2550 /*
2551   handler for ip reallocate, just add it to the list of requests and 
2552   handle this later in the monitor_cluster loop so we do not recurse
2553   with other requests to takeover_run()
2554 */
2555 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2556                                   TDB_DATA data, void *private_data)
2557 {
2558         struct srvid_request *request;
2559         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2560                                                     struct ctdb_recoverd);
2561
2562         if (data.dsize != sizeof(struct srvid_request)) {
2563                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2564                 return;
2565         }
2566
2567         request = (struct srvid_request *)data.dptr;
2568
2569         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2570 }
2571
2572 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2573                                           struct ctdb_recoverd *rec)
2574 {
2575         TDB_DATA result;
2576         int32_t ret;
2577         uint32_t culprit;
2578
2579         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2580
2581         /* update the list of public ips that a node can handle for
2582            all connected nodes
2583         */
2584         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2585         if (ret != 0) {
2586                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2587                                  culprit));
2588                 rec->need_takeover_run = true;
2589         }
2590         if (ret == 0) {
2591                 if (do_takeover_run(rec, rec->nodemap, false)) {
2592                         ret = ctdb_get_pnn(ctdb);
2593                 } else {
2594                         ret = -1;
2595                 }
2596         }
2597
2598         result.dsize = sizeof(int32_t);
2599         result.dptr  = (uint8_t *)&ret;
2600
2601         srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
2602 }
2603
2604
2605 /*
2606   handler for recovery master elections
2607 */
2608 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2609                              TDB_DATA data, void *private_data)
2610 {
2611         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2612         int ret;
2613         struct election_message *em = (struct election_message *)data.dptr;
2614         TALLOC_CTX *mem_ctx;
2615
2616         /* we got an election packet - update the timeout for the election */
2617         talloc_free(rec->election_timeout);
2618         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2619                                                 fast_start ?
2620                                                 timeval_current_ofs(0, 500000) :
2621                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2622                                                 ctdb_election_timeout, rec);
2623
2624         mem_ctx = talloc_new(ctdb);
2625
2626         /* someone called an election. check their election data
2627            and if we disagree and we would rather be the elected node, 
2628            send a new election message to all other nodes
2629          */
2630         if (ctdb_election_win(rec, em)) {
2631                 if (!rec->send_election_te) {
2632                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2633                                                                 timeval_current_ofs(0, 500000),
2634                                                                 election_send_request, rec);
2635                 }
2636                 talloc_free(mem_ctx);
2637                 /*unban_all_nodes(ctdb);*/
2638                 return;
2639         }
2640         
2641         /* we didn't win */
2642         talloc_free(rec->send_election_te);
2643         rec->send_election_te = NULL;
2644
2645         if (ctdb->tunable.verify_recovery_lock != 0) {
2646                 /* release the recmaster lock */
2647                 if (em->pnn != ctdb->pnn &&
2648                     ctdb->recovery_lock_fd != -1) {
2649                         close(ctdb->recovery_lock_fd);
2650                         ctdb->recovery_lock_fd = -1;
2651                         unban_all_nodes(ctdb);
2652                 }
2653         }
2654
2655         /* ok, let that guy become recmaster then */
2656         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2657         if (ret != 0) {
2658                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2659                 talloc_free(mem_ctx);
2660                 return;
2661         }
2662
2663         talloc_free(mem_ctx);
2664         return;
2665 }
2666
2667
2668 /*
2669   force the start of the election process
2670  */
2671 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2672                            struct ctdb_node_map *nodemap)
2673 {
2674         int ret;
2675         struct ctdb_context *ctdb = rec->ctdb;
2676
2677         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2678
2679         /* set all nodes to recovery mode to stop all internode traffic */
2680         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2681         if (ret != 0) {
2682                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2683                 return;
2684         }
2685
2686         talloc_free(rec->election_timeout);
2687         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2688                                                 fast_start ?
2689                                                 timeval_current_ofs(0, 500000) :
2690                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2691                                                 ctdb_election_timeout, rec);
2692
2693         ret = send_election_request(rec, pnn, true);
2694         if (ret!=0) {
2695                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2696                 return;
2697         }
2698
2699         /* wait for a few seconds to collect all responses */
2700         ctdb_wait_election(rec);
2701 }
2702
2703
2704
2705 /*
2706   handler for when a node changes its flags
2707 */
2708 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2709                             TDB_DATA data, void *private_data)
2710 {
2711         int ret;
2712         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2713         struct ctdb_node_map *nodemap=NULL;
2714         TALLOC_CTX *tmp_ctx;
2715         int i;
2716         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2717         int disabled_flag_changed;
2718
2719         if (data.dsize != sizeof(*c)) {
2720                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2721                 return;
2722         }
2723
2724         tmp_ctx = talloc_new(ctdb);
2725         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2726
2727         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2728         if (ret != 0) {
2729                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2730                 talloc_free(tmp_ctx);
2731                 return;         
2732         }
2733
2734
2735         for (i=0;i<nodemap->num;i++) {
2736                 if (nodemap->nodes[i].pnn == c->pnn) break;
2737         }
2738
2739         if (i == nodemap->num) {
2740                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2741                 talloc_free(tmp_ctx);
2742                 return;
2743         }
2744
2745         if (c->old_flags != c->new_flags) {
2746                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2747         }
2748
2749         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2750
2751         nodemap->nodes[i].flags = c->new_flags;
2752
2753         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2754                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2755
2756         if (ret == 0) {
2757                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2758                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2759         }
2760         
2761         if (ret == 0 &&
2762             ctdb->recovery_master == ctdb->pnn &&
2763             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2764                 /* Only do the takeover run if the perm disabled or unhealthy
2765                    flags changed since these will cause an ip failover but not
2766                    a recovery.
2767                    If the node became disconnected or banned this will also
2768                    lead to an ip address failover but that is handled 
2769                    during recovery
2770                 */
2771                 if (disabled_flag_changed) {
2772                         rec->need_takeover_run = true;
2773                 }
2774         }
2775
2776         talloc_free(tmp_ctx);
2777 }
2778
2779 /*
2780   handler for when we need to push out flag changes ot all other nodes
2781 */
2782 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2783                             TDB_DATA data, void *private_data)
2784 {
2785         int ret;
2786         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2787         struct ctdb_node_map *nodemap=NULL;
2788         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2789         uint32_t recmaster;
2790         uint32_t *nodes;
2791
2792         /* find the recovery master */
2793         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2794         if (ret != 0) {
2795                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2796                 talloc_free(tmp_ctx);
2797                 return;
2798         }
2799
2800         /* read the node flags from the recmaster */
2801         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2802         if (ret != 0) {
2803                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2804                 talloc_free(tmp_ctx);
2805                 return;
2806         }
2807         if (c->pnn >= nodemap->num) {
2808                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2809                 talloc_free(tmp_ctx);
2810                 return;
2811         }
2812
2813         /* send the flags update to all connected nodes */
2814         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2815
2816         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2817                                       nodes, 0, CONTROL_TIMEOUT(),
2818                                       false, data,
2819                                       NULL, NULL,
2820                                       NULL) != 0) {
2821                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2822
2823                 talloc_free(tmp_ctx);
2824                 return;
2825         }
2826
2827         talloc_free(tmp_ctx);
2828 }
2829
2830
2831 struct verify_recmode_normal_data {
2832         uint32_t count;
2833         enum monitor_result status;
2834 };
2835
2836 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2837 {
2838         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2839
2840
2841         /* one more node has responded with recmode data*/
2842         rmdata->count--;
2843
2844         /* if we failed to get the recmode, then return an error and let
2845            the main loop try again.
2846         */
2847         if (state->state != CTDB_CONTROL_DONE) {
2848                 if (rmdata->status == MONITOR_OK) {
2849                         rmdata->status = MONITOR_FAILED;
2850                 }
2851                 return;
2852         }
2853
2854         /* if we got a response, then the recmode will be stored in the
2855            status field
2856         */
2857         if (state->status != CTDB_RECOVERY_NORMAL) {
2858                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2859                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2860         }
2861
2862         return;
2863 }
2864
2865
2866 /* verify that all nodes are in normal recovery mode */
2867 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2868 {
2869         struct verify_recmode_normal_data *rmdata;
2870         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2871         struct ctdb_client_control_state *state;
2872         enum monitor_result status;
2873         int j;
2874         
2875         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2876         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2877         rmdata->count  = 0;
2878         rmdata->status = MONITOR_OK;
2879
2880         /* loop over all active nodes and send an async getrecmode call to 
2881            them*/
2882         for (j=0; j<nodemap->num; j++) {
2883                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2884                         continue;
2885                 }
2886                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2887                                         CONTROL_TIMEOUT(), 
2888                                         nodemap->nodes[j].pnn);
2889                 if (state == NULL) {
2890                         /* we failed to send the control, treat this as 
2891                            an error and try again next iteration
2892                         */                      
2893                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2894                         talloc_free(mem_ctx);
2895                         return MONITOR_FAILED;
2896                 }
2897
2898                 /* set up the callback functions */
2899                 state->async.fn = verify_recmode_normal_callback;
2900                 state->async.private_data = rmdata;
2901
2902                 /* one more control to wait for to complete */
2903                 rmdata->count++;
2904         }
2905
2906
2907         /* now wait for up to the maximum number of seconds allowed
2908            or until all nodes we expect a response from has replied
2909         */
2910         while (rmdata->count > 0) {
2911                 event_loop_once(ctdb->ev);
2912         }
2913
2914         status = rmdata->status;
2915         talloc_free(mem_ctx);
2916         return status;
2917 }
2918
2919
2920 struct verify_recmaster_data {
2921         struct ctdb_recoverd *rec;
2922         uint32_t count;
2923         uint32_t pnn;
2924         enum monitor_result status;
2925 };
2926
2927 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2928 {
2929         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2930
2931
2932         /* one more node has responded with recmaster data*/
2933         rmdata->count--;
2934
2935         /* if we failed to get the recmaster, then return an error and let
2936            the main loop try again.
2937         */
2938         if (state->state != CTDB_CONTROL_DONE) {
2939                 if (rmdata->status == MONITOR_OK) {
2940                         rmdata->status = MONITOR_FAILED;
2941                 }
2942                 return;
2943         }
2944
2945         /* if we got a response, then the recmaster will be stored in the
2946            status field
2947         */
2948         if (state->status != rmdata->pnn) {
2949                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2950                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2951                 rmdata->status = MONITOR_ELECTION_NEEDED;
2952         }
2953
2954         return;
2955 }
2956
2957
2958 /* verify that all nodes agree that we are the recmaster */
2959 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2960 {
2961         struct ctdb_context *ctdb = rec->ctdb;
2962         struct verify_recmaster_data *rmdata;
2963         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2964         struct ctdb_client_control_state *state;
2965         enum monitor_result status;
2966         int j;
2967         
2968         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2969         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2970         rmdata->rec    = rec;
2971         rmdata->count  = 0;
2972         rmdata->pnn    = pnn;
2973         rmdata->status = MONITOR_OK;
2974
2975         /* loop over all active nodes and send an async getrecmaster call to 
2976            them*/
2977         for (j=0; j<nodemap->num; j++) {
2978                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2979                         continue;
2980                 }
2981                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2982                                         CONTROL_TIMEOUT(),
2983                                         nodemap->nodes[j].pnn);
2984                 if (state == NULL) {
2985                         /* we failed to send the control, treat this as 
2986                            an error and try again next iteration
2987                         */                      
2988                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2989                         talloc_free(mem_ctx);
2990                         return MONITOR_FAILED;
2991                 }
2992
2993                 /* set up the callback functions */
2994                 state->async.fn = verify_recmaster_callback;
2995                 state->async.private_data = rmdata;
2996
2997                 /* one more control to wait for to complete */
2998                 rmdata->count++;
2999         }
3000
3001
3002         /* now wait for up to the maximum number of seconds allowed
3003            or until all nodes we expect a response from has replied
3004         */
3005         while (rmdata->count > 0) {
3006                 event_loop_once(ctdb->ev);
3007         }
3008
3009         status = rmdata->status;
3010         talloc_free(mem_ctx);
3011         return status;
3012 }
3013
3014 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3015                                     struct ctdb_recoverd *rec)
3016 {
3017         struct ctdb_control_get_ifaces *ifaces = NULL;
3018         TALLOC_CTX *mem_ctx;
3019         bool ret = false;
3020
3021         mem_ctx = talloc_new(NULL);
3022
3023         /* Read the interfaces from the local node */
3024         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3025                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3026                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3027                 /* We could return an error.  However, this will be
3028                  * rare so we'll decide that the interfaces have
3029                  * actually changed, just in case.
3030                  */
3031                 talloc_free(mem_ctx);
3032                 return true;
3033         }
3034
3035         if (!rec->ifaces) {
3036                 /* We haven't been here before so things have changed */
3037                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3038                 ret = true;
3039         } else if (rec->ifaces->num != ifaces->num) {
3040                 /* Number of interfaces has changed */
3041                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3042                                      rec->ifaces->num, ifaces->num));
3043                 ret = true;
3044         } else {
3045                 /* See if interface names or link states have changed */
3046                 int i;
3047                 for (i = 0; i < rec->ifaces->num; i++) {
3048                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3049                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3050                                 DEBUG(DEBUG_NOTICE,
3051                                       ("Interface in slot %d changed: %s => %s\n",
3052                                        i, iface->name, ifaces->ifaces[i].name));
3053                                 ret = true;
3054                                 break;
3055                         }
3056                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3057                                 DEBUG(DEBUG_NOTICE,
3058                                       ("Interface %s changed state: %d => %d\n",
3059                                        iface->name, iface->link_state,
3060                                        ifaces->ifaces[i].link_state));
3061                                 ret = true;
3062                                 break;
3063                         }
3064                 }
3065         }
3066
3067         talloc_free(rec->ifaces);
3068         rec->ifaces = talloc_steal(rec, ifaces);
3069
3070         talloc_free(mem_ctx);
3071         return ret;
3072 }
3073
3074 /* called to check that the local allocation of public ip addresses is ok.
3075 */
3076 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3077 {
3078         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3079         struct ctdb_uptime *uptime1 = NULL;
3080         struct ctdb_uptime *uptime2 = NULL;
3081         int ret, j;
3082         bool need_takeover_run = false;
3083
3084         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3085                                 CTDB_CURRENT_NODE, &uptime1);
3086         if (ret != 0) {
3087                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3088                 talloc_free(mem_ctx);
3089                 return -1;
3090         }
3091
3092         if (interfaces_have_changed(ctdb, rec)) {
3093                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3094                                      "local node %u - force takeover run\n",
3095                                      pnn));
3096                 need_takeover_run = true;
3097         }
3098
3099         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3100                                 CTDB_CURRENT_NODE, &uptime2);
3101         if (ret != 0) {
3102                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3103                 talloc_free(mem_ctx);
3104                 return -1;
3105         }
3106
3107         /* skip the check if the startrecovery time has changed */
3108         if (timeval_compare(&uptime1->last_recovery_started,
3109                             &uptime2->last_recovery_started) != 0) {
3110                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3111                 talloc_free(mem_ctx);
3112                 return 0;
3113         }
3114
3115         /* skip the check if the endrecovery time has changed */
3116         if (timeval_compare(&uptime1->last_recovery_finished,
3117                             &uptime2->last_recovery_finished) != 0) {
3118                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3119                 talloc_free(mem_ctx);
3120                 return 0;
3121         }
3122
3123         /* skip the check if we have started but not finished recovery */
3124         if (timeval_compare(&uptime1->last_recovery_finished,
3125                             &uptime1->last_recovery_started) != 1) {
3126                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3127                 talloc_free(mem_ctx);
3128
3129                 return 0;
3130         }
3131
3132         /* verify that we have the ip addresses we should have
3133            and we dont have ones we shouldnt have.
3134            if we find an inconsistency we set recmode to
3135            active on the local node and wait for the recmaster
3136            to do a full blown recovery.
3137            also if the pnn is -1 and we are healthy and can host the ip
3138            we also request a ip reallocation.
3139         */
3140         if (ctdb->tunable.disable_ip_failover == 0) {
3141                 struct ctdb_all_public_ips *ips = NULL;
3142
3143                 /* read the *available* IPs from the local node */
3144                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3145                 if (ret != 0) {
3146                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3147                         talloc_free(mem_ctx);
3148                         return -1;
3149                 }
3150
3151                 for (j=0; j<ips->num; j++) {
3152                         if (ips->ips[j].pnn == -1 &&
3153                             nodemap->nodes[pnn].flags == 0) {
3154                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3155                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3156                                 need_takeover_run = true;
3157                         }
3158                 }
3159
3160                 talloc_free(ips);
3161
3162                 /* read the *known* IPs from the local node */
3163                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3164                 if (ret != 0) {
3165                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3166                         talloc_free(mem_ctx);
3167                         return -1;
3168                 }
3169
3170                 for (j=0; j<ips->num; j++) {
3171                         if (ips->ips[j].pnn == pnn) {
3172                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3173                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3174                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3175                                         need_takeover_run = true;
3176                                 }
3177                         } else {
3178                                 if (ctdb->do_checkpublicip &&
3179                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3180
3181                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3182                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3183
3184                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3185                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3186                                         }
3187                                 }
3188                         }
3189                 }
3190         }
3191
3192         if (need_takeover_run) {
3193                 struct srvid_request rd;
3194                 TDB_DATA data;
3195
3196                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3197
3198                 rd.pnn = ctdb->pnn;
3199                 rd.srvid = 0;
3200                 data.dptr = (uint8_t *)&rd;
3201                 data.dsize = sizeof(rd);
3202
3203                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3204                 if (ret != 0) {
3205                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3206                 }
3207         }
3208         talloc_free(mem_ctx);
3209         return 0;
3210 }
3211
3212
3213 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3214 {
3215         struct ctdb_node_map **remote_nodemaps = callback_data;
3216
3217         if (node_pnn >= ctdb->num_nodes) {
3218                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3219                 return;
3220         }
3221
3222         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3223
3224 }
3225
3226 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3227         struct ctdb_node_map *nodemap,
3228         struct ctdb_node_map **remote_nodemaps)
3229 {
3230         uint32_t *nodes;
3231
3232         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3233         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3234                                         nodes, 0,
3235                                         CONTROL_TIMEOUT(), false, tdb_null,
3236                                         async_getnodemap_callback,
3237                                         NULL,
3238                                         remote_nodemaps) != 0) {
3239                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3240
3241                 return -1;
3242         }
3243
3244         return 0;
3245 }
3246
3247 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3248 struct ctdb_check_reclock_state {
3249         struct ctdb_context *ctdb;
3250         struct timeval start_time;
3251         int fd[2];
3252         pid_t child;
3253         struct timed_event *te;
3254         struct fd_event *fde;
3255         enum reclock_child_status status;
3256 };
3257
3258 /* when we free the reclock state we must kill any child process.
3259 */
3260 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3261 {
3262         struct ctdb_context *ctdb = state->ctdb;
3263
3264         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3265
3266         if (state->fd[0] != -1) {
3267                 close(state->fd[0]);
3268                 state->fd[0] = -1;
3269         }
3270         if (state->fd[1] != -1) {
3271                 close(state->fd[1]);
3272                 state->fd[1] = -1;
3273         }
3274         ctdb_kill(ctdb, state->child, SIGKILL);
3275         return 0;
3276 }
3277
3278 /*
3279   called if our check_reclock child times out. this would happen if
3280   i/o to the reclock file blocks.
3281  */
3282 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3283                                          struct timeval t, void *private_data)
3284 {
3285         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3286                                            struct ctdb_check_reclock_state);
3287
3288         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3289         state->status = RECLOCK_TIMEOUT;
3290 }
3291
3292 /* this is called when the child process has completed checking the reclock
3293    file and has written data back to us through the pipe.
3294 */
3295 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3296                              uint16_t flags, void *private_data)
3297 {
3298         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3299                                              struct ctdb_check_reclock_state);
3300         char c = 0;
3301         int ret;
3302
3303         /* we got a response from our child process so we can abort the
3304            timeout.
3305         */
3306         talloc_free(state->te);
3307         state->te = NULL;
3308
3309         ret = read(state->fd[0], &c, 1);
3310         if (ret != 1 || c != RECLOCK_OK) {
3311                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3312                 state->status = RECLOCK_FAILED;
3313
3314                 return;
3315         }
3316
3317         state->status = RECLOCK_OK;
3318         return;
3319 }
3320
3321 static int check_recovery_lock(struct ctdb_context *ctdb)
3322 {
3323         int ret;
3324         struct ctdb_check_reclock_state *state;
3325         pid_t parent = getpid();
3326
3327         if (ctdb->recovery_lock_fd == -1) {
3328                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3329                 return -1;
3330         }
3331
3332         state = talloc(ctdb, struct ctdb_check_reclock_state);
3333         CTDB_NO_MEMORY(ctdb, state);
3334
3335         state->ctdb = ctdb;
3336         state->start_time = timeval_current();
3337         state->status = RECLOCK_CHECKING;
3338         state->fd[0] = -1;
3339         state->fd[1] = -1;
3340
3341         ret = pipe(state->fd);
3342         if (ret != 0) {
3343                 talloc_free(state);
3344                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3345                 return -1;
3346         }
3347
3348         state->child = ctdb_fork(ctdb);
3349         if (state->child == (pid_t)-1) {
3350                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3351                 close(state->fd[0]);
3352                 state->fd[0] = -1;
3353                 close(state->fd[1]);
3354                 state->fd[1] = -1;
3355                 talloc_free(state);
3356                 return -1;
3357         }
3358
3359         if (state->child == 0) {
3360                 char cc = RECLOCK_OK;
3361                 close(state->fd[0]);
3362                 state->fd[0] = -1;
3363
3364                 ctdb_set_process_name("ctdb_rec_reclock");
3365                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3366                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3367                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3368                         cc = RECLOCK_FAILED;
3369                 }
3370
3371                 write(state->fd[1], &cc, 1);
3372                 /* make sure we die when our parent dies */
3373                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3374                         sleep(5);
3375                 }
3376                 _exit(0);
3377         }
3378         close(state->fd[1]);
3379         state->fd[1] = -1;
3380         set_close_on_exec(state->fd[0]);
3381
3382         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3383
3384         talloc_set_destructor(state, check_reclock_destructor);
3385
3386         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3387                                     ctdb_check_reclock_timeout, state);
3388         if (state->te == NULL) {
3389                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3390                 talloc_free(state);
3391                 return -1;
3392         }
3393
3394         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3395                                 EVENT_FD_READ,
3396                                 reclock_child_handler,
3397                                 (void *)state);
3398
3399         if (state->fde == NULL) {
3400                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3401                 talloc_free(state);
3402                 return -1;
3403         }
3404         tevent_fd_set_auto_close(state->fde);
3405
3406         while (state->status == RECLOCK_CHECKING) {
3407                 event_loop_once(ctdb->ev);
3408         }
3409
3410         if (state->status == RECLOCK_FAILED) {
3411                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3412                 close(ctdb->recovery_lock_fd);
3413                 ctdb->recovery_lock_fd = -1;
3414                 talloc_free(state);
3415                 return -1;
3416         }
3417
3418         talloc_free(state);
3419         return 0;
3420 }
3421
3422 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3423 {
3424         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3425         const char *reclockfile;
3426
3427         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3428                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3429                 talloc_free(tmp_ctx);
3430                 return -1;      
3431         }
3432
3433         if (reclockfile == NULL) {
3434                 if (ctdb->recovery_lock_file != NULL) {
3435                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3436                         talloc_free(ctdb->recovery_lock_file);
3437                         ctdb->recovery_lock_file = NULL;
3438                         if (ctdb->recovery_lock_fd != -1) {
3439                                 close(ctdb->recovery_lock_fd);
3440                                 ctdb->recovery_lock_fd = -1;
3441                         }
3442                 }
3443                 ctdb->tunable.verify_recovery_lock = 0;
3444                 talloc_free(tmp_ctx);
3445                 return 0;
3446         }
3447
3448         if (ctdb->recovery_lock_file == NULL) {
3449                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3450                 if (ctdb->recovery_lock_fd != -1) {
3451                         close(ctdb->recovery_lock_fd);
3452                         ctdb->recovery_lock_fd = -1;
3453                 }
3454                 talloc_free(tmp_ctx);
3455                 return 0;
3456         }
3457
3458
3459         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3460                 talloc_free(tmp_ctx);
3461                 return 0;
3462         }
3463
3464         talloc_free(ctdb->recovery_lock_file);
3465         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3466         ctdb->tunable.verify_recovery_lock = 0;
3467         if (ctdb->recovery_lock_fd != -1) {
3468                 close(ctdb->recovery_lock_fd);
3469                 ctdb->recovery_lock_fd = -1;
3470         }
3471
3472         talloc_free(tmp_ctx);
3473         return 0;
3474 }
3475
3476 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3477                       TALLOC_CTX *mem_ctx)
3478 {
3479         uint32_t pnn;
3480         struct ctdb_node_map *nodemap=NULL;
3481         struct ctdb_node_map *recmaster_nodemap=NULL;
3482         struct ctdb_node_map **remote_nodemaps=NULL;
3483         struct ctdb_vnn_map *vnnmap=NULL;
3484         struct ctdb_vnn_map *remote_vnnmap=NULL;
3485         int32_t debug_level;
3486         int i, j, ret;
3487         bool self_ban;
3488
3489
3490         /* verify that the main daemon is still running */
3491         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3492                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3493                 exit(-1);
3494         }
3495
3496         /* ping the local daemon to tell it we are alive */
3497         ctdb_ctrl_recd_ping(ctdb);
3498
3499         if (rec->election_timeout) {
3500                 /* an election is in progress */
3501                 return;
3502         }
3503
3504         /* read the debug level from the parent and update locally */
3505         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3506         if (ret !=0) {
3507                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3508                 return;
3509         }
3510         LogLevel = debug_level;
3511
3512         /* get relevant tunables */
3513         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3514         if (ret != 0) {
3515                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3516                 return;
3517         }
3518
3519         /* get the current recovery lock file from the server */
3520         if (update_recovery_lock_file(ctdb) != 0) {
3521                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3522                 return;
3523         }
3524
3525         /* Make sure that if recovery lock verification becomes disabled when
3526            we close the file
3527         */
3528         if (ctdb->tunable.verify_recovery_lock == 0) {
3529                 if (ctdb->recovery_lock_fd != -1) {
3530                         close(ctdb->recovery_lock_fd);
3531                         ctdb->recovery_lock_fd = -1;
3532                 }
3533         }
3534
3535         pnn = ctdb_get_pnn(ctdb);
3536
3537         /* get the vnnmap */
3538         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3539         if (ret != 0) {
3540                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3541                 return;
3542         }
3543
3544
3545         /* get number of nodes */
3546         if (rec->nodemap) {
3547                 talloc_free(rec->nodemap);
3548                 rec->nodemap = NULL;
3549                 nodemap=NULL;
3550         }
3551         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3552         if (ret != 0) {
3553                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3554                 return;
3555         }
3556         nodemap = rec->nodemap;
3557
3558         /* remember our own node flags */
3559         rec->node_flags = nodemap->nodes[pnn].flags;
3560
3561         ban_misbehaving_nodes(rec, &self_ban);
3562         if (self_ban) {
3563                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3564                 return;
3565         }
3566
3567         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3568            also frozen and that the recmode is set to active.
3569         */
3570         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3571                 /* If this node has become inactive then we want to
3572                  * reduce the chances of it taking over the recovery
3573                  * master role when it becomes active again.  This
3574                  * helps to stabilise the recovery master role so that
3575                  * it stays on the most stable node.
3576                  */
3577                 rec->priority_time = timeval_current();
3578
3579                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3580                 if (ret != 0) {
3581                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3582                 }
3583                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3584                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3585
3586                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3587                         if (ret != 0) {
3588                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3589                                 return;
3590                         }
3591                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3592                         if (ret != 0) {
3593                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3594
3595                                 return;
3596                         }
3597                 }
3598
3599                 /* If this node is stopped or banned then it is not the recovery
3600                  * master, so don't do anything. This prevents stopped or banned
3601                  * node from starting election and sending unnecessary controls.
3602                  */
3603                 return;
3604         }
3605
3606         /* check which node is the recovery master */
3607         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3608         if (ret != 0) {
3609                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3610                 return;
3611         }
3612
3613         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3614         if (rec->recmaster != pnn) {
3615                 TALLOC_FREE(rec->reallocate_requests);
3616         }
3617
3618         /* This is a special case.  When recovery daemon is started, recmaster
3619          * is set to -1.  If a node is not started in stopped state, then
3620          * start election to decide recovery master
3621          */
3622         if (rec->recmaster == (uint32_t)-1) {
3623                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3624                 force_election(rec, pnn, nodemap);
3625                 return;
3626         }
3627
3628         /* update the capabilities for all nodes */
3629         ret = update_capabilities(ctdb, nodemap);
3630         if (ret != 0) {
3631                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3632                 return;
3633         }
3634
3635         /*
3636          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3637          * but we have, then force an election and try to become the new
3638          * recmaster.
3639          */
3640         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3641             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3642              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3643                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3644                                   " but we (node %u) have - force an election\n",
3645                                   rec->recmaster, pnn));
3646                 force_election(rec, pnn, nodemap);
3647                 return;
3648         }
3649
3650         /* count how many active nodes there are */
3651         rec->num_active    = 0;
3652         rec->num_connected = 0;
3653         for (i=0; i<nodemap->num; i++) {
3654                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3655                         rec->num_active++;
3656                 }
3657                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3658                         rec->num_connected++;
3659                 }
3660         }
3661
3662
3663         /* verify that the recmaster node is still active */
3664         for (j=0; j<nodemap->num; j++) {
3665                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3666                         break;
3667                 }
3668         }
3669
3670         if (j == nodemap->num) {
3671                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3672                 force_election(rec, pnn, nodemap);
3673                 return;
3674         }
3675
3676         /* if recovery master is disconnected we must elect a new recmaster */
3677         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3678                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3679                 force_election(rec, pnn, nodemap);
3680                 return;
3681         }
3682
3683         /* get nodemap from the recovery master to check if it is inactive */
3684         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3685                                    mem_ctx, &recmaster_nodemap);
3686         if (ret != 0) {
3687                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3688                           nodemap->nodes[j].pnn));
3689                 return;
3690         }
3691
3692
3693         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3694             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3695                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3696                 /*
3697                  * update our nodemap to carry the recmaster's notion of
3698                  * its own flags, so that we don't keep freezing the
3699                  * inactive recmaster node...
3700                  */
3701                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3702                 force_election(rec, pnn, nodemap);
3703                 return;
3704         }
3705
3706         /* verify that we have all ip addresses we should have and we dont
3707          * have addresses we shouldnt have.
3708          */ 
3709         if (ctdb->tunable.disable_ip_failover == 0 &&
3710             rec->takeover_runs_disable_ctx == NULL) {
3711                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3712                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3713                 }
3714         }
3715
3716
3717         /* if we are not the recmaster then we do not need to check
3718            if recovery is needed
3719          */
3720         if (pnn != rec->recmaster) {
3721                 return;
3722         }
3723
3724
3725         /* ensure our local copies of flags are right */
3726         ret = update_local_flags(rec, nodemap);
3727         if (ret == MONITOR_ELECTION_NEEDED) {
3728                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3729                 force_election(rec, pnn, nodemap);
3730                 return;
3731         }
3732         if (ret != MONITOR_OK) {
3733                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3734                 return;
3735         }
3736
3737         if (ctdb->num_nodes != nodemap->num) {
3738                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3739                 reload_nodes_file(ctdb);
3740                 return;
3741         }
3742
3743         /* verify that all active nodes agree that we are the recmaster */
3744         switch (verify_recmaster(rec, nodemap, pnn)) {
3745         case MONITOR_RECOVERY_NEEDED:
3746                 /* can not happen */
3747                 return;
3748         case MONITOR_ELECTION_NEEDED:
3749                 force_election(rec, pnn, nodemap);
3750                 return;
3751         case MONITOR_OK:
3752                 break;
3753         case MONITOR_FAILED:
3754                 return;
3755         }
3756
3757
3758         if (rec->need_recovery) {
3759                 /* a previous recovery didn't finish */
3760                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3761                 return;
3762         }
3763
3764         /* verify that all active nodes are in normal mode 
3765            and not in recovery mode 
3766         */
3767         switch (verify_recmode(ctdb, nodemap)) {
3768         case MONITOR_RECOVERY_NEEDED:
3769                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3770                 return;
3771         case MONITOR_FAILED:
3772                 return;
3773         case MONITOR_ELECTION_NEEDED:
3774                 /* can not happen */
3775         case MONITOR_OK:
3776                 break;
3777         }
3778
3779
3780         if (ctdb->tunable.verify_recovery_lock != 0) {
3781                 /* we should have the reclock - check its not stale */
3782                 ret = check_recovery_lock(ctdb);
3783                 if (ret != 0) {
3784                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3785                         ctdb_set_culprit(rec, ctdb->pnn);
3786                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3787                         return;
3788                 }
3789         }
3790
3791
3792         /* if there are takeovers requested, perform it and notify the waiters */
3793         if (rec->takeover_runs_disable_ctx == NULL &&
3794             rec->reallocate_requests) {
3795                 process_ipreallocate_requests(ctdb, rec);
3796         }
3797
3798         /* get the nodemap for all active remote nodes
3799          */
3800         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3801         if (remote_nodemaps == NULL) {
3802                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3803                 return;
3804         }
3805         for(i=0; i<nodemap->num; i++) {
3806                 remote_nodemaps[i] = NULL;
3807         }
3808         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3809                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3810                 return;
3811         } 
3812
3813         /* verify that all other nodes have the same nodemap as we have
3814         */
3815         for (j=0; j<nodemap->num; j++) {
3816                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3817                         continue;
3818                 }
3819
3820                 if (remote_nodemaps[j] == NULL) {
3821                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3822                         ctdb_set_culprit(rec, j);
3823
3824                         return;
3825                 }
3826
3827                 /* if the nodes disagree on how many nodes there are
3828                    then this is a good reason to try recovery
3829                  */
3830                 if (remote_nodemaps[j]->num != nodemap->num) {
3831                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3832                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3833                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3834                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3835                         return;
3836                 }
3837
3838                 /* if the nodes disagree on which nodes exist and are
3839                    active, then that is also a good reason to do recovery
3840                  */
3841                 for (i=0;i<nodemap->num;i++) {
3842                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3843                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3844                                           nodemap->nodes[j].pnn, i, 
3845                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3846                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3847                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3848                                             vnnmap);
3849                                 return;
3850                         }
3851                 }
3852         }
3853
3854         /*
3855          * Update node flags obtained from each active node. This ensure we have
3856          * up-to-date information for all the nodes.
3857          */
3858         for (j=0; j<nodemap->num; j++) {
3859                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3860                         continue;
3861                 }
3862                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3863         }
3864
3865         for (j=0; j<nodemap->num; j++) {
3866                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3867                         continue;
3868                 }
3869
3870                 /* verify the flags are consistent
3871                 */
3872                 for (i=0; i<nodemap->num; i++) {
3873                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3874                                 continue;
3875                         }
3876                         
3877                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3878                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3879                                   nodemap->nodes[j].pnn, 
3880                                   nodemap->nodes[i].pnn, 
3881                                   remote_nodemaps[j]->nodes[i].flags,
3882                                   nodemap->nodes[i].flags));
3883                                 if (i == j) {
3884                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3885                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3886                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3887                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3888                                                     vnnmap);
3889                                         return;
3890                                 } else {
3891                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3892                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3893                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3894                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3895                                                     vnnmap);
3896                                         return;
3897                                 }
3898                         }
3899                 }
3900         }
3901
3902
3903         /* there better be the same number of lmasters in the vnn map
3904            as there are active nodes or we will have to do a recovery
3905          */
3906         if (vnnmap->size != rec->num_active) {
3907                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3908                           vnnmap->size, rec->num_active));
3909                 ctdb_set_culprit(rec, ctdb->pnn);
3910                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3911                 return;
3912         }
3913
3914         /* verify that all active nodes in the nodemap also exist in 
3915            the vnnmap.
3916          */
3917         for (j=0; j<nodemap->num; j++) {
3918                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3919                         continue;
3920                 }
3921                 if (nodemap->nodes[j].pnn == pnn) {
3922                         continue;
3923                 }
3924
3925                 for (i=0; i<vnnmap->size; i++) {
3926                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3927                                 break;
3928                         }
3929                 }
3930                 if (i == vnnmap->size) {
3931                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3932                                   nodemap->nodes[j].pnn));
3933                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3934                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3935                         return;
3936                 }
3937         }
3938
3939         
3940         /* verify that all other nodes have the same vnnmap
3941            and are from the same generation
3942          */
3943         for (j=0; j<nodemap->num; j++) {
3944                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3945                         continue;
3946                 }
3947                 if (nodemap->nodes[j].pnn == pnn) {
3948                         continue;
3949                 }
3950
3951                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3952                                           mem_ctx, &remote_vnnmap);
3953                 if (ret != 0) {
3954                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3955                                   nodemap->nodes[j].pnn));
3956                         return;
3957                 }
3958
3959                 /* verify the vnnmap generation is the same */
3960                 if (vnnmap->generation != remote_vnnmap->generation) {
3961                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3962                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3963                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3964                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3965                         return;
3966                 }
3967
3968                 /* verify the vnnmap size is the same */
3969                 if (vnnmap->size != remote_vnnmap->size) {
3970                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3971                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3972                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3973                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3974                         return;
3975                 }
3976
3977                 /* verify the vnnmap is the same */
3978                 for (i=0;i<vnnmap->size;i++) {
3979                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3980                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3981                                           nodemap->nodes[j].pnn));
3982                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3983                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3984                                             vnnmap);
3985                                 return;
3986                         }
3987                 }
3988         }
3989
3990         /* we might need to change who has what IP assigned */
3991         if (rec->need_takeover_run) {
3992                 uint32_t culprit = (uint32_t)-1;
3993
3994                 rec->need_takeover_run = false;
3995
3996                 /* update the list of public ips that a node can handle for
3997                    all connected nodes
3998                 */
3999                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4000                 if (ret != 0) {
4001                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4002                                          culprit));
4003                         rec->need_takeover_run = true;
4004                         return;
4005                 }
4006
4007                 /* execute the "startrecovery" event script on all nodes */
4008                 ret = run_startrecovery_eventscript(rec, nodemap);
4009                 if (ret!=0) {
4010                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4011                         ctdb_set_culprit(rec, ctdb->pnn);
4012                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4013                         return;
4014                 }
4015
4016                 /* If takeover run fails, then the offending nodes are
4017                  * assigned ban culprit counts. And we re-try takeover.
4018                  * If takeover run fails repeatedly, the node would get
4019                  * banned.
4020                  *
4021                  * If rec->need_takeover_run is not set to true at this
4022                  * failure, monitoring is disabled cluster-wide (via
4023                  * startrecovery eventscript) and will not get enabled.
4024                  */
4025                 if (!do_takeover_run(rec, nodemap, true)) {
4026                         return;
4027                 }
4028
4029                 /* execute the "recovered" event script on all nodes */
4030                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4031 #if 0
4032 // we cant check whether the event completed successfully
4033 // since this script WILL fail if the node is in recovery mode
4034 // and if that race happens, the code here would just cause a second
4035 // cascading recovery.
4036                 if (ret!=0) {
4037                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4038                         ctdb_set_culprit(rec, ctdb->pnn);
4039                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4040                 }
4041 #endif
4042         }
4043 }
4044
4045 /*
4046   the main monitoring loop
4047  */
4048 static void monitor_cluster(struct ctdb_context *ctdb)
4049 {
4050         struct ctdb_recoverd *rec;
4051
4052         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4053
4054         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4055         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4056
4057         rec->ctdb = ctdb;
4058
4059         rec->takeover_run_in_progress = false;
4060
4061         rec->priority_time = timeval_current();
4062
4063         /* register a message port for sending memory dumps */
4064         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4065
4066         /* register a message port for requesting logs */
4067         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4068
4069         /* register a message port for clearing logs */
4070         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4071
4072         /* register a message port for recovery elections */
4073         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4074
4075         /* when nodes are disabled/enabled */
4076         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4077
4078         /* when we are asked to puch out a flag change */
4079         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4080
4081         /* register a message port for vacuum fetch */
4082         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4083
4084         /* register a message port for reloadnodes  */
4085         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4086
4087         /* register a message port for performing a takeover run */
4088         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4089
4090         /* register a message port for disabling the ip check for a short while */
4091         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4092
4093         /* register a message port for updating the recovery daemons node assignment for an ip */
4094         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4095
4096         /* register a message port for forcing a rebalance of a node next
4097            reallocation */
4098         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4099
4100         /* Register a message port for disabling takeover runs */
4101         ctdb_client_set_message_handler(ctdb,
4102                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4103                                         disable_takeover_runs_handler, rec);
4104
4105         for (;;) {
4106                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4107                 struct timeval start;
4108                 double elapsed;
4109
4110                 if (!mem_ctx) {
4111                         DEBUG(DEBUG_CRIT,(__location__
4112                                           " Failed to create temp context\n"));
4113                         exit(-1);
4114                 }
4115
4116                 start = timeval_current();
4117                 main_loop(ctdb, rec, mem_ctx);
4118                 talloc_free(mem_ctx);
4119
4120                 /* we only check for recovery once every second */
4121                 elapsed = timeval_elapsed(&start);
4122                 if (elapsed < ctdb->tunable.recover_interval) {
4123                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4124                                           - elapsed);
4125                 }
4126         }
4127 }
4128
4129 /*
4130   event handler for when the main ctdbd dies
4131  */
4132 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4133                                  uint16_t flags, void *private_data)
4134 {
4135         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4136         _exit(1);
4137 }
4138
4139 /*
4140   called regularly to verify that the recovery daemon is still running
4141  */
4142 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4143                               struct timeval yt, void *p)
4144 {
4145         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4146
4147         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4148                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4149
4150                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4151                                 ctdb_restart_recd, ctdb);
4152
4153                 return;
4154         }
4155
4156         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4157                         timeval_current_ofs(30, 0),
4158                         ctdb_check_recd, ctdb);
4159 }
4160
4161 static void recd_sig_child_handler(struct event_context *ev,
4162         struct signal_event *se, int signum, int count,
4163         void *dont_care, 
4164         void *private_data)
4165 {
4166 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4167         int status;
4168         pid_t pid = -1;
4169
4170         while (pid != 0) {
4171                 pid = waitpid(-1, &status, WNOHANG);
4172                 if (pid == -1) {
4173                         if (errno != ECHILD) {
4174                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4175                         }
4176                         return;
4177                 }
4178                 if (pid > 0) {
4179                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4180                 }
4181         }
4182 }
4183
4184 /*
4185   startup the recovery daemon as a child of the main ctdb daemon
4186  */
4187 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4188 {
4189         int fd[2];
4190         struct signal_event *se;
4191         struct tevent_fd *fde;
4192
4193         if (pipe(fd) != 0) {
4194                 return -1;
4195         }
4196
4197         ctdb->ctdbd_pid = getpid();
4198
4199         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4200         if (ctdb->recoverd_pid == -1) {
4201                 return -1;
4202         }
4203
4204         if (ctdb->recoverd_pid != 0) {
4205                 talloc_free(ctdb->recd_ctx);
4206                 ctdb->recd_ctx = talloc_new(ctdb);
4207                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4208
4209                 close(fd[0]);
4210                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4211                                 timeval_current_ofs(30, 0),
4212                                 ctdb_check_recd, ctdb);
4213                 return 0;
4214         }
4215
4216         close(fd[1]);
4217
4218         srandom(getpid() ^ time(NULL));
4219
4220         /* Clear the log ringbuffer */
4221         ctdb_clear_log(ctdb);
4222
4223         ctdb_set_process_name("ctdb_recovered");
4224         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4225                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4226                 exit(1);
4227         }
4228
4229         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4230
4231         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4232                      ctdb_recoverd_parent, &fd[0]);
4233         tevent_fd_set_auto_close(fde);
4234
4235         /* set up a handler to pick up sigchld */
4236         se = event_add_signal(ctdb->ev, ctdb,
4237                                      SIGCHLD, 0,
4238                                      recd_sig_child_handler,
4239                                      ctdb);
4240         if (se == NULL) {
4241                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4242                 exit(1);
4243         }
4244
4245         monitor_cluster(ctdb);
4246
4247         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4248         return -1;
4249 }
4250
4251 /*
4252   shutdown the recovery daemon
4253  */
4254 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4255 {
4256         if (ctdb->recoverd_pid == 0) {
4257                 return;
4258         }
4259
4260         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4261         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4262
4263         TALLOC_FREE(ctdb->recd_ctx);
4264         TALLOC_FREE(ctdb->recd_ping_count);
4265 }
4266
4267 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4268                        struct timeval t, void *private_data)
4269 {
4270         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4271
4272         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4273         ctdb_stop_recoverd(ctdb);
4274         ctdb_start_recoverd(ctdb);
4275 }