ctdb-recoverd: Detach database from recovery daemon
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_lmasters;
133         uint32_t num_connected;
134         uint32_t last_culprit_node;
135         struct ctdb_node_map *nodemap;
136         struct timeval priority_time;
137         bool need_takeover_run;
138         bool need_recovery;
139         uint32_t node_flags;
140         struct timed_event *send_election_te;
141         struct timed_event *election_timeout;
142         struct vacuum_info *vacuum_info;
143         struct srvid_requests *reallocate_requests;
144         bool takeover_run_in_progress;
145         TALLOC_CTX *takeover_runs_disable_ctx;
146         struct ctdb_control_get_ifaces *ifaces;
147         uint32_t *force_rebalance_nodes;
148 };
149
150 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
151 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
152
153 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
154
155 /*
156   ban a node for a period of time
157  */
158 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
159 {
160         int ret;
161         struct ctdb_context *ctdb = rec->ctdb;
162         struct ctdb_ban_time bantime;
163        
164         if (!ctdb_validate_pnn(ctdb, pnn)) {
165                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
166                 return;
167         }
168
169         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
170
171         bantime.pnn  = pnn;
172         bantime.time = ban_time;
173
174         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
175         if (ret != 0) {
176                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
177                 return;
178         }
179
180 }
181
182 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
183
184
185 /*
186   remember the trouble maker
187  */
188 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
191         struct ctdb_banning_state *ban_state;
192
193         if (culprit > ctdb->num_nodes) {
194                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
195                 return;
196         }
197
198         /* If we are banned or stopped, do not set other nodes as culprits */
199         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
200                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
201                 return;
202         }
203
204         if (ctdb->nodes[culprit]->ban_state == NULL) {
205                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
206                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
207
208                 
209         }
210         ban_state = ctdb->nodes[culprit]->ban_state;
211         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
212                 /* this was the first time in a long while this node
213                    misbehaved so we will forgive any old transgressions.
214                 */
215                 ban_state->count = 0;
216         }
217
218         ban_state->count += count;
219         ban_state->last_reported_time = timeval_current();
220         rec->last_culprit_node = culprit;
221 }
222
223 /*
224   remember the trouble maker
225  */
226 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
227 {
228         ctdb_set_culprit_count(rec, culprit, 1);
229 }
230
231
232 /* this callback is called for every node that failed to execute the
233    recovered event
234 */
235 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
236 {
237         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
238
239         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
240
241         ctdb_set_culprit(rec, node_pnn);
242 }
243
244 /*
245   run the "recovered" eventscript on all nodes
246  */
247 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
248 {
249         TALLOC_CTX *tmp_ctx;
250         uint32_t *nodes;
251         struct ctdb_context *ctdb = rec->ctdb;
252
253         tmp_ctx = talloc_new(ctdb);
254         CTDB_NO_MEMORY(ctdb, tmp_ctx);
255
256         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
257         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
258                                         nodes, 0,
259                                         CONTROL_TIMEOUT(), false, tdb_null,
260                                         NULL, recovered_fail_callback,
261                                         rec) != 0) {
262                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
263
264                 talloc_free(tmp_ctx);
265                 return -1;
266         }
267
268         talloc_free(tmp_ctx);
269         return 0;
270 }
271
272 /* this callback is called for every node that failed to execute the
273    start recovery event
274 */
275 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
276 {
277         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
278
279         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
280
281         ctdb_set_culprit(rec, node_pnn);
282 }
283
284 /*
285   run the "startrecovery" eventscript on all nodes
286  */
287 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
288 {
289         TALLOC_CTX *tmp_ctx;
290         uint32_t *nodes;
291         struct ctdb_context *ctdb = rec->ctdb;
292
293         tmp_ctx = talloc_new(ctdb);
294         CTDB_NO_MEMORY(ctdb, tmp_ctx);
295
296         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
298                                         nodes, 0,
299                                         CONTROL_TIMEOUT(), false, tdb_null,
300                                         NULL,
301                                         startrecovery_fail_callback,
302                                         rec) != 0) {
303                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
304                 talloc_free(tmp_ctx);
305                 return -1;
306         }
307
308         talloc_free(tmp_ctx);
309         return 0;
310 }
311
312 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
313 {
314         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
315                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
316                 return;
317         }
318         if (node_pnn < ctdb->num_nodes) {
319                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
320         }
321
322         if (node_pnn == ctdb->pnn) {
323                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
324         }
325 }
326
327 /*
328   update the node capabilities for all connected nodes
329  */
330 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
331 {
332         uint32_t *nodes;
333         TALLOC_CTX *tmp_ctx;
334
335         tmp_ctx = talloc_new(ctdb);
336         CTDB_NO_MEMORY(ctdb, tmp_ctx);
337
338         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
340                                         nodes, 0,
341                                         CONTROL_TIMEOUT(),
342                                         false, tdb_null,
343                                         async_getcap_callback, NULL,
344                                         NULL) != 0) {
345                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
346                 talloc_free(tmp_ctx);
347                 return -1;
348         }
349
350         talloc_free(tmp_ctx);
351         return 0;
352 }
353
354 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
355 {
356         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
357
358         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
359         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
360 }
361
362 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
363 {
364         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
365
366         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
367         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
368 }
369
370 /*
371   change recovery mode on all nodes
372  */
373 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
374 {
375         TDB_DATA data;
376         uint32_t *nodes;
377         TALLOC_CTX *tmp_ctx;
378
379         tmp_ctx = talloc_new(ctdb);
380         CTDB_NO_MEMORY(ctdb, tmp_ctx);
381
382         /* freeze all nodes */
383         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
384         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
385                 int i;
386
387                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
388                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
389                                                 nodes, i,
390                                                 CONTROL_TIMEOUT(),
391                                                 false, tdb_null,
392                                                 NULL,
393                                                 set_recmode_fail_callback,
394                                                 rec) != 0) {
395                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
396                                 talloc_free(tmp_ctx);
397                                 return -1;
398                         }
399                 }
400         }
401
402
403         data.dsize = sizeof(uint32_t);
404         data.dptr = (unsigned char *)&rec_mode;
405
406         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
407                                         nodes, 0,
408                                         CONTROL_TIMEOUT(),
409                                         false, data,
410                                         NULL, NULL,
411                                         NULL) != 0) {
412                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
413                 talloc_free(tmp_ctx);
414                 return -1;
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   change recovery master on all node
423  */
424 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
425 {
426         TDB_DATA data;
427         TALLOC_CTX *tmp_ctx;
428         uint32_t *nodes;
429
430         tmp_ctx = talloc_new(ctdb);
431         CTDB_NO_MEMORY(ctdb, tmp_ctx);
432
433         data.dsize = sizeof(uint32_t);
434         data.dptr = (unsigned char *)&pnn;
435
436         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
437         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
438                                         nodes, 0,
439                                         CONTROL_TIMEOUT(), false, data,
440                                         NULL, NULL,
441                                         NULL) != 0) {
442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
443                 talloc_free(tmp_ctx);
444                 return -1;
445         }
446
447         talloc_free(tmp_ctx);
448         return 0;
449 }
450
451 /* update all remote nodes to use the same db priority that we have
452    this can fail if the remove node has not yet been upgraded to 
453    support this function, so we always return success and never fail
454    a recovery if this call fails.
455 */
456 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
457         struct ctdb_node_map *nodemap, 
458         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int db;
461
462         /* step through all local databases */
463         for (db=0; db<dbmap->num;db++) {
464                 struct ctdb_db_priority db_prio;
465                 int ret;
466
467                 db_prio.db_id     = dbmap->dbs[db].dbid;
468                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
469                 if (ret != 0) {
470                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
471                         continue;
472                 }
473
474                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
475
476                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
477                                                 CTDB_CURRENT_NODE, &db_prio);
478                 if (ret != 0) {
479                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
480                                          db_prio.db_id));
481                 }
482         }
483
484         return 0;
485 }                       
486
487 /*
488   ensure all other nodes have attached to any databases that we have
489  */
490 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
491                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
492 {
493         int i, j, db, ret;
494         struct ctdb_dbid_map *remote_dbmap;
495
496         /* verify that all other nodes have all our databases */
497         for (j=0; j<nodemap->num; j++) {
498                 /* we dont need to ourself ourselves */
499                 if (nodemap->nodes[j].pnn == pnn) {
500                         continue;
501                 }
502                 /* dont check nodes that are unavailable */
503                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
504                         continue;
505                 }
506
507                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                          mem_ctx, &remote_dbmap);
509                 if (ret != 0) {
510                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
511                         return -1;
512                 }
513
514                 /* step through all local databases */
515                 for (db=0; db<dbmap->num;db++) {
516                         const char *name;
517
518
519                         for (i=0;i<remote_dbmap->num;i++) {
520                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
521                                         break;
522                                 }
523                         }
524                         /* the remote node already have this database */
525                         if (i!=remote_dbmap->num) {
526                                 continue;
527                         }
528                         /* ok so we need to create this database */
529                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
530                                                   dbmap->dbs[db].dbid, mem_ctx,
531                                                   &name);
532                         if (ret != 0) {
533                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
534                                 return -1;
535                         }
536                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
537                                                  nodemap->nodes[j].pnn,
538                                                  mem_ctx, name,
539                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
542                                 return -1;
543                         }
544                 }
545         }
546
547         return 0;
548 }
549
550
551 /*
552   ensure we are attached to any databases that anyone else is attached to
553  */
554 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
555                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
556 {
557         int i, j, db, ret;
558         struct ctdb_dbid_map *remote_dbmap;
559
560         /* verify that we have all database any other node has */
561         for (j=0; j<nodemap->num; j++) {
562                 /* we dont need to ourself ourselves */
563                 if (nodemap->nodes[j].pnn == pnn) {
564                         continue;
565                 }
566                 /* dont check nodes that are unavailable */
567                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
568                         continue;
569                 }
570
571                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
572                                          mem_ctx, &remote_dbmap);
573                 if (ret != 0) {
574                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
575                         return -1;
576                 }
577
578                 /* step through all databases on the remote node */
579                 for (db=0; db<remote_dbmap->num;db++) {
580                         const char *name;
581
582                         for (i=0;i<(*dbmap)->num;i++) {
583                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
584                                         break;
585                                 }
586                         }
587                         /* we already have this db locally */
588                         if (i!=(*dbmap)->num) {
589                                 continue;
590                         }
591                         /* ok so we need to create this database and
592                            rebuild dbmap
593                          */
594                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
595                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
596                         if (ret != 0) {
597                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
598                                           nodemap->nodes[j].pnn));
599                                 return -1;
600                         }
601                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
602                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
603                         if (ret != 0) {
604                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
605                                 return -1;
606                         }
607                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
608                         if (ret != 0) {
609                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
610                                 return -1;
611                         }
612                 }
613         }
614
615         return 0;
616 }
617
618
619 /*
620   pull the remote database contents from one node into the recdb
621  */
622 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
623                                     struct tdb_wrap *recdb, uint32_t dbid)
624 {
625         int ret;
626         TDB_DATA outdata;
627         struct ctdb_marshall_buffer *reply;
628         struct ctdb_rec_data *rec;
629         int i;
630         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
631
632         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
633                                CONTROL_TIMEOUT(), &outdata);
634         if (ret != 0) {
635                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
636                 talloc_free(tmp_ctx);
637                 return -1;
638         }
639
640         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
641
642         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
643                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
644                 talloc_free(tmp_ctx);
645                 return -1;
646         }
647         
648         rec = (struct ctdb_rec_data *)&reply->data[0];
649         
650         for (i=0;
651              i<reply->count;
652              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
653                 TDB_DATA key, data;
654                 struct ctdb_ltdb_header *hdr;
655                 TDB_DATA existing;
656                 
657                 key.dptr = &rec->data[0];
658                 key.dsize = rec->keylen;
659                 data.dptr = &rec->data[key.dsize];
660                 data.dsize = rec->datalen;
661                 
662                 hdr = (struct ctdb_ltdb_header *)data.dptr;
663
664                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
665                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
666                         talloc_free(tmp_ctx);
667                         return -1;
668                 }
669
670                 /* fetch the existing record, if any */
671                 existing = tdb_fetch(recdb->tdb, key);
672                 
673                 if (existing.dptr != NULL) {
674                         struct ctdb_ltdb_header header;
675                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
676                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
677                                          (unsigned)existing.dsize, srcnode));
678                                 free(existing.dptr);
679                                 talloc_free(tmp_ctx);
680                                 return -1;
681                         }
682                         header = *(struct ctdb_ltdb_header *)existing.dptr;
683                         free(existing.dptr);
684                         if (!(header.rsn < hdr->rsn ||
685                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
686                                 continue;
687                         }
688                 }
689                 
690                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
691                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
692                         talloc_free(tmp_ctx);
693                         return -1;                              
694                 }
695         }
696
697         talloc_free(tmp_ctx);
698
699         return 0;
700 }
701
702
703 struct pull_seqnum_cbdata {
704         int failed;
705         uint32_t pnn;
706         uint64_t seqnum;
707 };
708
709 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
710 {
711         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
712         uint64_t seqnum;
713
714         if (cb_data->failed != 0) {
715                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
716                 return;
717         }
718
719         if (res != 0) {
720                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
721                 cb_data->failed = 1;
722                 return;
723         }
724
725         if (outdata.dsize != sizeof(uint64_t)) {
726                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
727                 cb_data->failed = -1;
728                 return;
729         }
730
731         seqnum = *((uint64_t *)outdata.dptr);
732
733         if (seqnum > cb_data->seqnum ||
734             (cb_data->pnn == -1 && seqnum == 0)) {
735                 cb_data->seqnum = seqnum;
736                 cb_data->pnn = node_pnn;
737         }
738 }
739
740 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
741 {
742         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
743
744         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
745         cb_data->failed = 1;
746 }
747
748 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
749                                 struct ctdb_recoverd *rec, 
750                                 struct ctdb_node_map *nodemap, 
751                                 struct tdb_wrap *recdb, uint32_t dbid)
752 {
753         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
754         uint32_t *nodes;
755         TDB_DATA data;
756         uint32_t outdata[2];
757         struct pull_seqnum_cbdata *cb_data;
758
759         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
760
761         outdata[0] = dbid;
762         outdata[1] = 0;
763
764         data.dsize = sizeof(outdata);
765         data.dptr  = (uint8_t *)&outdata[0];
766
767         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
768         if (cb_data == NULL) {
769                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
770                 talloc_free(tmp_ctx);
771                 return -1;
772         }
773
774         cb_data->failed = 0;
775         cb_data->pnn    = -1;
776         cb_data->seqnum = 0;
777         
778         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
779         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
780                                         nodes, 0,
781                                         CONTROL_TIMEOUT(), false, data,
782                                         pull_seqnum_cb,
783                                         pull_seqnum_fail_cb,
784                                         cb_data) != 0) {
785                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
786
787                 talloc_free(tmp_ctx);
788                 return -1;
789         }
790
791         if (cb_data->failed != 0) {
792                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
793                 talloc_free(tmp_ctx);
794                 return -1;
795         }
796
797         if (cb_data->pnn == -1) {
798                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
799                 talloc_free(tmp_ctx);
800                 return -1;
801         }
802
803         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
804
805         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
806                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
807                 talloc_free(tmp_ctx);
808                 return -1;
809         }
810
811         talloc_free(tmp_ctx);
812         return 0;
813 }
814
815
816 /*
817   pull all the remote database contents into the recdb
818  */
819 static int pull_remote_database(struct ctdb_context *ctdb,
820                                 struct ctdb_recoverd *rec, 
821                                 struct ctdb_node_map *nodemap, 
822                                 struct tdb_wrap *recdb, uint32_t dbid,
823                                 bool persistent)
824 {
825         int j;
826
827         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
828                 int ret;
829                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
830                 if (ret == 0) {
831                         return 0;
832                 }
833         }
834
835         /* pull all records from all other nodes across onto this node
836            (this merges based on rsn)
837         */
838         for (j=0; j<nodemap->num; j++) {
839                 /* dont merge from nodes that are unavailable */
840                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
841                         continue;
842                 }
843                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
844                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
845                                  nodemap->nodes[j].pnn));
846                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
847                         return -1;
848                 }
849         }
850         
851         return 0;
852 }
853
854
855 /*
856   update flags on all active nodes
857  */
858 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
859 {
860         int ret;
861
862         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
863                 if (ret != 0) {
864                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
865                 return -1;
866         }
867
868         return 0;
869 }
870
871 /*
872   ensure all nodes have the same vnnmap we do
873  */
874 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
875                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
876 {
877         int j, ret;
878
879         /* push the new vnn map out to all the nodes */
880         for (j=0; j<nodemap->num; j++) {
881                 /* dont push to nodes that are unavailable */
882                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
883                         continue;
884                 }
885
886                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
887                 if (ret != 0) {
888                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
889                         return -1;
890                 }
891         }
892
893         return 0;
894 }
895
896
897 struct vacuum_info {
898         struct vacuum_info *next, *prev;
899         struct ctdb_recoverd *rec;
900         uint32_t srcnode;
901         struct ctdb_db_context *ctdb_db;
902         struct ctdb_marshall_buffer *recs;
903         struct ctdb_rec_data *r;
904 };
905
906 static void vacuum_fetch_next(struct vacuum_info *v);
907
908 /*
909   called when a vacuum fetch has completed - just free it and do the next one
910  */
911 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
912 {
913         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
914         talloc_free(state);
915         vacuum_fetch_next(v);
916 }
917
918
919 /*
920   process the next element from the vacuum list
921 */
922 static void vacuum_fetch_next(struct vacuum_info *v)
923 {
924         struct ctdb_call call;
925         struct ctdb_rec_data *r;
926
927         while (v->recs->count) {
928                 struct ctdb_client_call_state *state;
929                 TDB_DATA data;
930                 struct ctdb_ltdb_header *hdr;
931
932                 ZERO_STRUCT(call);
933                 call.call_id = CTDB_NULL_FUNC;
934                 call.flags = CTDB_IMMEDIATE_MIGRATION;
935                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
936
937                 r = v->r;
938                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
939                 v->recs->count--;
940
941                 call.key.dptr = &r->data[0];
942                 call.key.dsize = r->keylen;
943
944                 /* ensure we don't block this daemon - just skip a record if we can't get
945                    the chainlock */
946                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
947                         continue;
948                 }
949
950                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
951                 if (data.dptr == NULL) {
952                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
953                         continue;
954                 }
955
956                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
957                         free(data.dptr);
958                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
959                         continue;
960                 }
961                 
962                 hdr = (struct ctdb_ltdb_header *)data.dptr;
963                 if (hdr->dmaster == v->rec->ctdb->pnn) {
964                         /* its already local */
965                         free(data.dptr);
966                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
967                         continue;
968                 }
969
970                 free(data.dptr);
971
972                 state = ctdb_call_send(v->ctdb_db, &call);
973                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
974                 if (state == NULL) {
975                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
976                         talloc_free(v);
977                         return;
978                 }
979                 state->async.fn = vacuum_fetch_callback;
980                 state->async.private_data = v;
981                 return;
982         }
983
984         talloc_free(v);
985 }
986
987
988 /*
989   destroy a vacuum info structure
990  */
991 static int vacuum_info_destructor(struct vacuum_info *v)
992 {
993         DLIST_REMOVE(v->rec->vacuum_info, v);
994         return 0;
995 }
996
997
998 /*
999   handler for vacuum fetch
1000 */
1001 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1002                                  TDB_DATA data, void *private_data)
1003 {
1004         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1005         struct ctdb_marshall_buffer *recs;
1006         int ret, i;
1007         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1008         const char *name;
1009         struct ctdb_dbid_map *dbmap=NULL;
1010         bool persistent = false;
1011         struct ctdb_db_context *ctdb_db;
1012         struct ctdb_rec_data *r;
1013         uint32_t srcnode;
1014         struct vacuum_info *v;
1015
1016         recs = (struct ctdb_marshall_buffer *)data.dptr;
1017         r = (struct ctdb_rec_data *)&recs->data[0];
1018
1019         if (recs->count == 0) {
1020                 talloc_free(tmp_ctx);
1021                 return;
1022         }
1023
1024         srcnode = r->reqid;
1025
1026         for (v=rec->vacuum_info;v;v=v->next) {
1027                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1028                         /* we're already working on records from this node */
1029                         talloc_free(tmp_ctx);
1030                         return;
1031                 }
1032         }
1033
1034         /* work out if the database is persistent */
1035         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1036         if (ret != 0) {
1037                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1038                 talloc_free(tmp_ctx);
1039                 return;
1040         }
1041
1042         for (i=0;i<dbmap->num;i++) {
1043                 if (dbmap->dbs[i].dbid == recs->db_id) {
1044                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1045                         break;
1046                 }
1047         }
1048         if (i == dbmap->num) {
1049                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1050                 talloc_free(tmp_ctx);
1051                 return;         
1052         }
1053
1054         /* find the name of this database */
1055         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1056                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1057                 talloc_free(tmp_ctx);
1058                 return;
1059         }
1060
1061         /* attach to it */
1062         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1063         if (ctdb_db == NULL) {
1064                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1065                 talloc_free(tmp_ctx);
1066                 return;
1067         }
1068
1069         v = talloc_zero(rec, struct vacuum_info);
1070         if (v == NULL) {
1071                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1072                 talloc_free(tmp_ctx);
1073                 return;
1074         }
1075
1076         v->rec = rec;
1077         v->srcnode = srcnode;
1078         v->ctdb_db = ctdb_db;
1079         v->recs = talloc_memdup(v, recs, data.dsize);
1080         if (v->recs == NULL) {
1081                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1082                 talloc_free(v);
1083                 talloc_free(tmp_ctx);
1084                 return;         
1085         }
1086         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1087
1088         DLIST_ADD(rec->vacuum_info, v);
1089
1090         talloc_set_destructor(v, vacuum_info_destructor);
1091
1092         vacuum_fetch_next(v);
1093         talloc_free(tmp_ctx);
1094 }
1095
1096
1097 /*
1098  * handler for database detach
1099  */
1100 static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
1101                                     TDB_DATA data, void *private_data)
1102 {
1103         struct ctdb_recoverd *rec = talloc_get_type(private_data,
1104                                                     struct ctdb_recoverd);
1105         uint32_t db_id;
1106         struct vacuum_info *v, *vnext;
1107         struct ctdb_db_context *ctdb_db;
1108
1109         if (data.dsize != sizeof(db_id)) {
1110                 return;
1111         }
1112         db_id = *(uint32_t *)data.dptr;
1113
1114         ctdb_db = find_ctdb_db(ctdb, db_id);
1115         if (ctdb_db == NULL) {
1116                 /* database is not attached */
1117                 return;
1118         }
1119
1120         /* Stop any active vacuum fetch */
1121         v = rec->vacuum_info;
1122         while (v != NULL) {
1123                 vnext = v->next;
1124
1125                 if (v->ctdb_db->db_id == db_id) {
1126                         talloc_free(v);
1127                 }
1128                 v = vnext;
1129         }
1130
1131         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1132
1133         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1134                              ctdb_db->db_name));
1135         talloc_free(ctdb_db);
1136 }
1137
1138 /*
1139   called when ctdb_wait_timeout should finish
1140  */
1141 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1142                               struct timeval yt, void *p)
1143 {
1144         uint32_t *timed_out = (uint32_t *)p;
1145         (*timed_out) = 1;
1146 }
1147
1148 /*
1149   wait for a given number of seconds
1150  */
1151 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1152 {
1153         uint32_t timed_out = 0;
1154         time_t usecs = (secs - (time_t)secs) * 1000000;
1155         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1156         while (!timed_out) {
1157                 event_loop_once(ctdb->ev);
1158         }
1159 }
1160
1161 /*
1162   called when an election times out (ends)
1163  */
1164 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1165                                   struct timeval t, void *p)
1166 {
1167         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1168         rec->election_timeout = NULL;
1169         fast_start = false;
1170
1171         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1172 }
1173
1174
1175 /*
1176   wait for an election to finish. It finished election_timeout seconds after
1177   the last election packet is received
1178  */
1179 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1180 {
1181         struct ctdb_context *ctdb = rec->ctdb;
1182         while (rec->election_timeout) {
1183                 event_loop_once(ctdb->ev);
1184         }
1185 }
1186
1187 /*
1188   Update our local flags from all remote connected nodes. 
1189   This is only run when we are or we belive we are the recovery master
1190  */
1191 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1192 {
1193         int j;
1194         struct ctdb_context *ctdb = rec->ctdb;
1195         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1196
1197         /* get the nodemap for all active remote nodes and verify
1198            they are the same as for this node
1199          */
1200         for (j=0; j<nodemap->num; j++) {
1201                 struct ctdb_node_map *remote_nodemap=NULL;
1202                 int ret;
1203
1204                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1205                         continue;
1206                 }
1207                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1208                         continue;
1209                 }
1210
1211                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1212                                            mem_ctx, &remote_nodemap);
1213                 if (ret != 0) {
1214                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1215                                   nodemap->nodes[j].pnn));
1216                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1217                         talloc_free(mem_ctx);
1218                         return MONITOR_FAILED;
1219                 }
1220                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1221                         /* We should tell our daemon about this so it
1222                            updates its flags or else we will log the same 
1223                            message again in the next iteration of recovery.
1224                            Since we are the recovery master we can just as
1225                            well update the flags on all nodes.
1226                         */
1227                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1228                         if (ret != 0) {
1229                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1230                                 return -1;
1231                         }
1232
1233                         /* Update our local copy of the flags in the recovery
1234                            daemon.
1235                         */
1236                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1237                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1238                                  nodemap->nodes[j].flags));
1239                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1240                 }
1241                 talloc_free(remote_nodemap);
1242         }
1243         talloc_free(mem_ctx);
1244         return MONITOR_OK;
1245 }
1246
1247
1248 /* Create a new random generation ip. 
1249    The generation id can not be the INVALID_GENERATION id
1250 */
1251 static uint32_t new_generation(void)
1252 {
1253         uint32_t generation;
1254
1255         while (1) {
1256                 generation = random();
1257
1258                 if (generation != INVALID_GENERATION) {
1259                         break;
1260                 }
1261         }
1262
1263         return generation;
1264 }
1265
1266
1267 /*
1268   create a temporary working database
1269  */
1270 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1271 {
1272         char *name;
1273         struct tdb_wrap *recdb;
1274         unsigned tdb_flags;
1275
1276         /* open up the temporary recovery database */
1277         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1278                                ctdb->db_directory_state,
1279                                ctdb->pnn);
1280         if (name == NULL) {
1281                 return NULL;
1282         }
1283         unlink(name);
1284
1285         tdb_flags = TDB_NOLOCK;
1286         if (ctdb->valgrinding) {
1287                 tdb_flags |= TDB_NOMMAP;
1288         }
1289         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1290
1291         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1292                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1293         if (recdb == NULL) {
1294                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1295         }
1296
1297         talloc_free(name);
1298
1299         return recdb;
1300 }
1301
1302
1303 /* 
1304    a traverse function for pulling all relevant records from recdb
1305  */
1306 struct recdb_data {
1307         struct ctdb_context *ctdb;
1308         struct ctdb_marshall_buffer *recdata;
1309         uint32_t len;
1310         uint32_t allocated_len;
1311         bool failed;
1312         bool persistent;
1313 };
1314
1315 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1316 {
1317         struct recdb_data *params = (struct recdb_data *)p;
1318         struct ctdb_rec_data *rec;
1319         struct ctdb_ltdb_header *hdr;
1320
1321         /*
1322          * skip empty records - but NOT for persistent databases:
1323          *
1324          * The record-by-record mode of recovery deletes empty records.
1325          * For persistent databases, this can lead to data corruption
1326          * by deleting records that should be there:
1327          *
1328          * - Assume the cluster has been running for a while.
1329          *
1330          * - A record R in a persistent database has been created and
1331          *   deleted a couple of times, the last operation being deletion,
1332          *   leaving an empty record with a high RSN, say 10.
1333          *
1334          * - Now a node N is turned off.
1335          *
1336          * - This leaves the local database copy of D on N with the empty
1337          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1338          *   the copy of record R.
1339          *
1340          * - Now the record is created again while node N is turned off.
1341          *   This creates R with RSN = 1 on all nodes except for N.
1342          *
1343          * - Now node N is turned on again. The following recovery will chose
1344          *   the older empty copy of R due to RSN 10 > RSN 1.
1345          *
1346          * ==> Hence the record is gone after the recovery.
1347          *
1348          * On databases like Samba's registry, this can damage the higher-level
1349          * data structures built from the various tdb-level records.
1350          */
1351         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1352                 return 0;
1353         }
1354
1355         /* update the dmaster field to point to us */
1356         hdr = (struct ctdb_ltdb_header *)data.dptr;
1357         if (!params->persistent) {
1358                 hdr->dmaster = params->ctdb->pnn;
1359                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1360         }
1361
1362         /* add the record to the blob ready to send to the nodes */
1363         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1364         if (rec == NULL) {
1365                 params->failed = true;
1366                 return -1;
1367         }
1368         if (params->len + rec->length >= params->allocated_len) {
1369                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1370                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1371         }
1372         if (params->recdata == NULL) {
1373                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1374                          rec->length + params->len));
1375                 params->failed = true;
1376                 return -1;
1377         }
1378         params->recdata->count++;
1379         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1380         params->len += rec->length;
1381         talloc_free(rec);
1382
1383         return 0;
1384 }
1385
1386 /*
1387   push the recdb database out to all nodes
1388  */
1389 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1390                                bool persistent,
1391                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1392 {
1393         struct recdb_data params;
1394         struct ctdb_marshall_buffer *recdata;
1395         TDB_DATA outdata;
1396         TALLOC_CTX *tmp_ctx;
1397         uint32_t *nodes;
1398
1399         tmp_ctx = talloc_new(ctdb);
1400         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1401
1402         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1403         CTDB_NO_MEMORY(ctdb, recdata);
1404
1405         recdata->db_id = dbid;
1406
1407         params.ctdb = ctdb;
1408         params.recdata = recdata;
1409         params.len = offsetof(struct ctdb_marshall_buffer, data);
1410         params.allocated_len = params.len;
1411         params.failed = false;
1412         params.persistent = persistent;
1413
1414         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1415                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1416                 talloc_free(params.recdata);
1417                 talloc_free(tmp_ctx);
1418                 return -1;
1419         }
1420
1421         if (params.failed) {
1422                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1423                 talloc_free(params.recdata);
1424                 talloc_free(tmp_ctx);
1425                 return -1;              
1426         }
1427
1428         recdata = params.recdata;
1429
1430         outdata.dptr = (void *)recdata;
1431         outdata.dsize = params.len;
1432
1433         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1434         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1435                                         nodes, 0,
1436                                         CONTROL_TIMEOUT(), false, outdata,
1437                                         NULL, NULL,
1438                                         NULL) != 0) {
1439                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1440                 talloc_free(recdata);
1441                 talloc_free(tmp_ctx);
1442                 return -1;
1443         }
1444
1445         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1446                   dbid, recdata->count));
1447
1448         talloc_free(recdata);
1449         talloc_free(tmp_ctx);
1450
1451         return 0;
1452 }
1453
1454
1455 /*
1456   go through a full recovery on one database 
1457  */
1458 static int recover_database(struct ctdb_recoverd *rec, 
1459                             TALLOC_CTX *mem_ctx,
1460                             uint32_t dbid,
1461                             bool persistent,
1462                             uint32_t pnn, 
1463                             struct ctdb_node_map *nodemap,
1464                             uint32_t transaction_id)
1465 {
1466         struct tdb_wrap *recdb;
1467         int ret;
1468         struct ctdb_context *ctdb = rec->ctdb;
1469         TDB_DATA data;
1470         struct ctdb_control_wipe_database w;
1471         uint32_t *nodes;
1472
1473         recdb = create_recdb(ctdb, mem_ctx);
1474         if (recdb == NULL) {
1475                 return -1;
1476         }
1477
1478         /* pull all remote databases onto the recdb */
1479         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1480         if (ret != 0) {
1481                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1482                 return -1;
1483         }
1484
1485         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1486
1487         /* wipe all the remote databases. This is safe as we are in a transaction */
1488         w.db_id = dbid;
1489         w.transaction_id = transaction_id;
1490
1491         data.dptr = (void *)&w;
1492         data.dsize = sizeof(w);
1493
1494         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1495         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1496                                         nodes, 0,
1497                                         CONTROL_TIMEOUT(), false, data,
1498                                         NULL, NULL,
1499                                         NULL) != 0) {
1500                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1501                 talloc_free(recdb);
1502                 return -1;
1503         }
1504         
1505         /* push out the correct database. This sets the dmaster and skips 
1506            the empty records */
1507         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1508         if (ret != 0) {
1509                 talloc_free(recdb);
1510                 return -1;
1511         }
1512
1513         /* all done with this database */
1514         talloc_free(recdb);
1515
1516         return 0;
1517 }
1518
1519 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1520                                          struct ctdb_recoverd *rec,
1521                                          struct ctdb_node_map *nodemap,
1522                                          uint32_t *culprit)
1523 {
1524         int j;
1525         int ret;
1526
1527         if (ctdb->num_nodes != nodemap->num) {
1528                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1529                                   ctdb->num_nodes, nodemap->num));
1530                 if (culprit) {
1531                         *culprit = ctdb->pnn;
1532                 }
1533                 return -1;
1534         }
1535
1536         for (j=0; j<nodemap->num; j++) {
1537                 /* For readability */
1538                 struct ctdb_node *node = ctdb->nodes[j];
1539
1540                 /* release any existing data */
1541                 if (node->known_public_ips) {
1542                         talloc_free(node->known_public_ips);
1543                         node->known_public_ips = NULL;
1544                 }
1545                 if (node->available_public_ips) {
1546                         talloc_free(node->available_public_ips);
1547                         node->available_public_ips = NULL;
1548                 }
1549
1550                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1551                         continue;
1552                 }
1553
1554                 /* Retrieve the list of known public IPs from the node */
1555                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1556                                         CONTROL_TIMEOUT(),
1557                                         node->pnn,
1558                                         ctdb->nodes,
1559                                         0,
1560                                         &node->known_public_ips);
1561                 if (ret != 0) {
1562                         DEBUG(DEBUG_ERR,
1563                               ("Failed to read known public IPs from node: %u\n",
1564                                node->pnn));
1565                         if (culprit) {
1566                                 *culprit = node->pnn;
1567                         }
1568                         return -1;
1569                 }
1570
1571                 if (ctdb->do_checkpublicip &&
1572                     rec->takeover_runs_disable_ctx == NULL &&
1573                     verify_remote_ip_allocation(ctdb,
1574                                                  node->known_public_ips,
1575                                                  node->pnn)) {
1576                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1577                         rec->need_takeover_run = true;
1578                 }
1579
1580                 /* Retrieve the list of available public IPs from the node */
1581                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1582                                         CONTROL_TIMEOUT(),
1583                                         node->pnn,
1584                                         ctdb->nodes,
1585                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1586                                         &node->available_public_ips);
1587                 if (ret != 0) {
1588                         DEBUG(DEBUG_ERR,
1589                               ("Failed to read available public IPs from node: %u\n",
1590                                node->pnn));
1591                         if (culprit) {
1592                                 *culprit = node->pnn;
1593                         }
1594                         return -1;
1595                 }
1596         }
1597
1598         return 0;
1599 }
1600
1601 /* when we start a recovery, make sure all nodes use the same reclock file
1602    setting
1603 */
1604 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1605 {
1606         struct ctdb_context *ctdb = rec->ctdb;
1607         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1608         TDB_DATA data;
1609         uint32_t *nodes;
1610
1611         if (ctdb->recovery_lock_file == NULL) {
1612                 data.dptr  = NULL;
1613                 data.dsize = 0;
1614         } else {
1615                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1616                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1617         }
1618
1619         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1620         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1621                                         nodes, 0,
1622                                         CONTROL_TIMEOUT(),
1623                                         false, data,
1624                                         NULL, NULL,
1625                                         rec) != 0) {
1626                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1627                 talloc_free(tmp_ctx);
1628                 return -1;
1629         }
1630
1631         talloc_free(tmp_ctx);
1632         return 0;
1633 }
1634
1635
1636 /*
1637  * this callback is called for every node that failed to execute ctdb_takeover_run()
1638  * and set flag to re-run takeover run.
1639  */
1640 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1641 {
1642         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1643
1644         if (callback_data != NULL) {
1645                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1646
1647                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1648
1649                 ctdb_set_culprit(rec, node_pnn);
1650         }
1651 }
1652
1653
1654 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1655 {
1656         struct ctdb_context *ctdb = rec->ctdb;
1657         int i;
1658         struct ctdb_banning_state *ban_state;
1659
1660         *self_ban = false;
1661         for (i=0; i<ctdb->num_nodes; i++) {
1662                 if (ctdb->nodes[i]->ban_state == NULL) {
1663                         continue;
1664                 }
1665                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1666                 if (ban_state->count < 2*ctdb->num_nodes) {
1667                         continue;
1668                 }
1669
1670                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1671                         ctdb->nodes[i]->pnn, ban_state->count,
1672                         ctdb->tunable.recovery_ban_period));
1673                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1674                 ban_state->count = 0;
1675
1676                 /* Banning ourself? */
1677                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1678                         *self_ban = true;
1679                 }
1680         }
1681 }
1682
1683 static bool do_takeover_run(struct ctdb_recoverd *rec,
1684                             struct ctdb_node_map *nodemap,
1685                             bool banning_credits_on_fail)
1686 {
1687         uint32_t *nodes = NULL;
1688         struct srvid_request_data dtr;
1689         TDB_DATA data;
1690         int i;
1691         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1692         int ret;
1693         bool ok;
1694
1695         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1696
1697         if (rec->takeover_run_in_progress) {
1698                 DEBUG(DEBUG_ERR, (__location__
1699                                   " takeover run already in progress \n"));
1700                 ok = false;
1701                 goto done;
1702         }
1703
1704         rec->takeover_run_in_progress = true;
1705
1706         /* If takeover runs are in disabled then fail... */
1707         if (rec->takeover_runs_disable_ctx != NULL) {
1708                 DEBUG(DEBUG_ERR,
1709                       ("Takeover runs are disabled so refusing to run one\n"));
1710                 ok = false;
1711                 goto done;
1712         }
1713
1714         /* Disable IP checks (takeover runs, really) on other nodes
1715          * while doing this takeover run.  This will stop those other
1716          * nodes from triggering takeover runs when think they should
1717          * be hosting an IP but it isn't yet on an interface.  Don't
1718          * wait for replies since a failure here might cause some
1719          * noise in the logs but will not actually cause a problem.
1720          */
1721         dtr.srvid = 0; /* No reply */
1722         dtr.pnn = -1;
1723
1724         data.dptr  = (uint8_t*)&dtr;
1725         data.dsize = sizeof(dtr);
1726
1727         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1728
1729         /* Disable for 60 seconds.  This can be a tunable later if
1730          * necessary.
1731          */
1732         dtr.data = 60;
1733         for (i = 0; i < talloc_array_length(nodes); i++) {
1734                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1735                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1736                                              data) != 0) {
1737                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1738                 }
1739         }
1740
1741         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1742                                 rec->force_rebalance_nodes,
1743                                 takeover_fail_callback,
1744                                 banning_credits_on_fail ? rec : NULL);
1745
1746         /* Reenable takeover runs and IP checks on other nodes */
1747         dtr.data = 0;
1748         for (i = 0; i < talloc_array_length(nodes); i++) {
1749                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1750                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1751                                              data) != 0) {
1752                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1753                 }
1754         }
1755
1756         if (ret != 0) {
1757                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1758                 ok = false;
1759                 goto done;
1760         }
1761
1762         ok = true;
1763         /* Takeover run was successful so clear force rebalance targets */
1764         if (rebalance_nodes == rec->force_rebalance_nodes) {
1765                 TALLOC_FREE(rec->force_rebalance_nodes);
1766         } else {
1767                 DEBUG(DEBUG_WARNING,
1768                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1769         }
1770 done:
1771         rec->need_takeover_run = !ok;
1772         talloc_free(nodes);
1773         rec->takeover_run_in_progress = false;
1774
1775         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1776         return ok;
1777 }
1778
1779
1780 /*
1781   we are the recmaster, and recovery is needed - start a recovery run
1782  */
1783 static int do_recovery(struct ctdb_recoverd *rec, 
1784                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1785                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1786 {
1787         struct ctdb_context *ctdb = rec->ctdb;
1788         int i, j, ret;
1789         uint32_t generation;
1790         struct ctdb_dbid_map *dbmap;
1791         TDB_DATA data;
1792         uint32_t *nodes;
1793         struct timeval start_time;
1794         uint32_t culprit = (uint32_t)-1;
1795         bool self_ban;
1796
1797         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1798
1799         /* if recovery fails, force it again */
1800         rec->need_recovery = true;
1801
1802         ban_misbehaving_nodes(rec, &self_ban);
1803         if (self_ban) {
1804                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1805                 return -1;
1806         }
1807
1808         if (ctdb->tunable.verify_recovery_lock != 0) {
1809                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1810                 start_time = timeval_current();
1811                 if (!ctdb_recovery_lock(ctdb, true)) {
1812                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1813                                          "and ban ourself for %u seconds\n",
1814                                          ctdb->tunable.recovery_ban_period));
1815                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1816                         return -1;
1817                 }
1818                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1819                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1820         }
1821
1822         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1823
1824         /* get a list of all databases */
1825         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1826         if (ret != 0) {
1827                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1828                 return -1;
1829         }
1830
1831         /* we do the db creation before we set the recovery mode, so the freeze happens
1832            on all databases we will be dealing with. */
1833
1834         /* verify that we have all the databases any other node has */
1835         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1836         if (ret != 0) {
1837                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1838                 return -1;
1839         }
1840
1841         /* verify that all other nodes have all our databases */
1842         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1845                 return -1;
1846         }
1847         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1848
1849         /* update the database priority for all remote databases */
1850         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1851         if (ret != 0) {
1852                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1853         }
1854         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1855
1856
1857         /* update all other nodes to use the same setting for reclock files
1858            as the local recovery master.
1859         */
1860         sync_recovery_lock_file_across_cluster(rec);
1861
1862         /* set recovery mode to active on all nodes */
1863         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1864         if (ret != 0) {
1865                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1866                 return -1;
1867         }
1868
1869         /* execute the "startrecovery" event script on all nodes */
1870         ret = run_startrecovery_eventscript(rec, nodemap);
1871         if (ret!=0) {
1872                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1873                 return -1;
1874         }
1875
1876         /*
1877           update all nodes to have the same flags that we have
1878          */
1879         for (i=0;i<nodemap->num;i++) {
1880                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1881                         continue;
1882                 }
1883
1884                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1885                 if (ret != 0) {
1886                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1887                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1888                         } else {
1889                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1890                                 return -1;
1891                         }
1892                 }
1893         }
1894
1895         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1896
1897         /* pick a new generation number */
1898         generation = new_generation();
1899
1900         /* change the vnnmap on this node to use the new generation 
1901            number but not on any other nodes.
1902            this guarantees that if we abort the recovery prematurely
1903            for some reason (a node stops responding?)
1904            that we can just return immediately and we will reenter
1905            recovery shortly again.
1906            I.e. we deliberately leave the cluster with an inconsistent
1907            generation id to allow us to abort recovery at any stage and
1908            just restart it from scratch.
1909          */
1910         vnnmap->generation = generation;
1911         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1912         if (ret != 0) {
1913                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1914                 return -1;
1915         }
1916
1917         data.dptr = (void *)&generation;
1918         data.dsize = sizeof(uint32_t);
1919
1920         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1921         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1922                                         nodes, 0,
1923                                         CONTROL_TIMEOUT(), false, data,
1924                                         NULL,
1925                                         transaction_start_fail_callback,
1926                                         rec) != 0) {
1927                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1928                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1929                                         nodes, 0,
1930                                         CONTROL_TIMEOUT(), false, tdb_null,
1931                                         NULL,
1932                                         NULL,
1933                                         NULL) != 0) {
1934                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1935                 }
1936                 return -1;
1937         }
1938
1939         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1940
1941         for (i=0;i<dbmap->num;i++) {
1942                 ret = recover_database(rec, mem_ctx,
1943                                        dbmap->dbs[i].dbid,
1944                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1945                                        pnn, nodemap, generation);
1946                 if (ret != 0) {
1947                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1948                         return -1;
1949                 }
1950         }
1951
1952         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1953
1954         /* commit all the changes */
1955         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1956                                         nodes, 0,
1957                                         CONTROL_TIMEOUT(), false, data,
1958                                         NULL, NULL,
1959                                         NULL) != 0) {
1960                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1961                 return -1;
1962         }
1963
1964         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1965         
1966
1967         /* update the capabilities for all nodes */
1968         ret = update_capabilities(ctdb, nodemap);
1969         if (ret!=0) {
1970                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1971                 return -1;
1972         }
1973
1974         /* build a new vnn map with all the currently active and
1975            unbanned nodes */
1976         generation = new_generation();
1977         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1978         CTDB_NO_MEMORY(ctdb, vnnmap);
1979         vnnmap->generation = generation;
1980         vnnmap->size = 0;
1981         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1982         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1983         for (i=j=0;i<nodemap->num;i++) {
1984                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1985                         continue;
1986                 }
1987                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1988                         /* this node can not be an lmaster */
1989                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1990                         continue;
1991                 }
1992
1993                 vnnmap->size++;
1994                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1995                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1996                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1997
1998         }
1999         if (vnnmap->size == 0) {
2000                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2001                 vnnmap->size++;
2002                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2003                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2004                 vnnmap->map[0] = pnn;
2005         }       
2006
2007         /* update to the new vnnmap on all nodes */
2008         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2009         if (ret != 0) {
2010                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2011                 return -1;
2012         }
2013
2014         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2015
2016         /* update recmaster to point to us for all nodes */
2017         ret = set_recovery_master(ctdb, nodemap, pnn);
2018         if (ret!=0) {
2019                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2020                 return -1;
2021         }
2022
2023         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2024
2025         /* disable recovery mode */
2026         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
2027         if (ret != 0) {
2028                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2029                 return -1;
2030         }
2031
2032         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2033
2034         /* Fetch known/available public IPs from each active node */
2035         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2036         if (ret != 0) {
2037                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2038                                  culprit));
2039                 rec->need_takeover_run = true;
2040                 return -1;
2041         }
2042
2043         do_takeover_run(rec, nodemap, false);
2044
2045         /* execute the "recovered" event script on all nodes */
2046         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2047         if (ret!=0) {
2048                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2049                 return -1;
2050         }
2051
2052         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2053
2054         /* send a message to all clients telling them that the cluster 
2055            has been reconfigured */
2056         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2057                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2058         if (ret != 0) {
2059                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2060                 return -1;
2061         }
2062
2063         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2064
2065         rec->need_recovery = false;
2066
2067         /* we managed to complete a full recovery, make sure to forgive
2068            any past sins by the nodes that could now participate in the
2069            recovery.
2070         */
2071         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2072         for (i=0;i<nodemap->num;i++) {
2073                 struct ctdb_banning_state *ban_state;
2074
2075                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2076                         continue;
2077                 }
2078
2079                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2080                 if (ban_state == NULL) {
2081                         continue;
2082                 }
2083
2084                 ban_state->count = 0;
2085         }
2086
2087
2088         /* We just finished a recovery successfully. 
2089            We now wait for rerecovery_timeout before we allow 
2090            another recovery to take place.
2091         */
2092         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2093         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2094         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2095
2096         return 0;
2097 }
2098
2099
2100 /*
2101   elections are won by first checking the number of connected nodes, then
2102   the priority time, then the pnn
2103  */
2104 struct election_message {
2105         uint32_t num_connected;
2106         struct timeval priority_time;
2107         uint32_t pnn;
2108         uint32_t node_flags;
2109 };
2110
2111 /*
2112   form this nodes election data
2113  */
2114 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2115 {
2116         int ret, i;
2117         struct ctdb_node_map *nodemap;
2118         struct ctdb_context *ctdb = rec->ctdb;
2119
2120         ZERO_STRUCTP(em);
2121
2122         em->pnn = rec->ctdb->pnn;
2123         em->priority_time = rec->priority_time;
2124
2125         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2126         if (ret != 0) {
2127                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2128                 return;
2129         }
2130
2131         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2132         em->node_flags = rec->node_flags;
2133
2134         for (i=0;i<nodemap->num;i++) {
2135                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2136                         em->num_connected++;
2137                 }
2138         }
2139
2140         /* we shouldnt try to win this election if we cant be a recmaster */
2141         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2142                 em->num_connected = 0;
2143                 em->priority_time = timeval_current();
2144         }
2145
2146         talloc_free(nodemap);
2147 }
2148
2149 /*
2150   see if the given election data wins
2151  */
2152 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2153 {
2154         struct election_message myem;
2155         int cmp = 0;
2156
2157         ctdb_election_data(rec, &myem);
2158
2159         /* we cant win if we dont have the recmaster capability */
2160         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2161                 return false;
2162         }
2163
2164         /* we cant win if we are banned */
2165         if (rec->node_flags & NODE_FLAGS_BANNED) {
2166                 return false;
2167         }
2168
2169         /* we cant win if we are stopped */
2170         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2171                 return false;
2172         }
2173
2174         /* we will automatically win if the other node is banned */
2175         if (em->node_flags & NODE_FLAGS_BANNED) {
2176                 return true;
2177         }
2178
2179         /* we will automatically win if the other node is banned */
2180         if (em->node_flags & NODE_FLAGS_STOPPED) {
2181                 return true;
2182         }
2183
2184         /* try to use the most connected node */
2185         if (cmp == 0) {
2186                 cmp = (int)myem.num_connected - (int)em->num_connected;
2187         }
2188
2189         /* then the longest running node */
2190         if (cmp == 0) {
2191                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2192         }
2193
2194         if (cmp == 0) {
2195                 cmp = (int)myem.pnn - (int)em->pnn;
2196         }
2197
2198         return cmp > 0;
2199 }
2200
2201 /*
2202   send out an election request
2203  */
2204 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2205 {
2206         int ret;
2207         TDB_DATA election_data;
2208         struct election_message emsg;
2209         uint64_t srvid;
2210         struct ctdb_context *ctdb = rec->ctdb;
2211
2212         srvid = CTDB_SRVID_RECOVERY;
2213
2214         ctdb_election_data(rec, &emsg);
2215
2216         election_data.dsize = sizeof(struct election_message);
2217         election_data.dptr  = (unsigned char *)&emsg;
2218
2219
2220         /* first we assume we will win the election and set 
2221            recoverymaster to be ourself on the current node
2222          */
2223         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2224         if (ret != 0) {
2225                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2226                 return -1;
2227         }
2228
2229
2230         /* send an election message to all active nodes */
2231         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2232         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2233 }
2234
2235 /*
2236   this function will unban all nodes in the cluster
2237 */
2238 static void unban_all_nodes(struct ctdb_context *ctdb)
2239 {
2240         int ret, i;
2241         struct ctdb_node_map *nodemap;
2242         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2243         
2244         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2245         if (ret != 0) {
2246                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2247                 return;
2248         }
2249
2250         for (i=0;i<nodemap->num;i++) {
2251                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2252                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2253                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2254                                                  nodemap->nodes[i].pnn, 0,
2255                                                  NODE_FLAGS_BANNED);
2256                         if (ret != 0) {
2257                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2258                         }
2259                 }
2260         }
2261
2262         talloc_free(tmp_ctx);
2263 }
2264
2265
2266 /*
2267   we think we are winning the election - send a broadcast election request
2268  */
2269 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2270 {
2271         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2272         int ret;
2273
2274         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2275         if (ret != 0) {
2276                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2277         }
2278
2279         talloc_free(rec->send_election_te);
2280         rec->send_election_te = NULL;
2281 }
2282
2283 /*
2284   handler for memory dumps
2285 */
2286 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2287                              TDB_DATA data, void *private_data)
2288 {
2289         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2290         TDB_DATA *dump;
2291         int ret;
2292         struct srvid_request *rd;
2293
2294         if (data.dsize != sizeof(struct srvid_request)) {
2295                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2296                 talloc_free(tmp_ctx);
2297                 return;
2298         }
2299         rd = (struct srvid_request *)data.dptr;
2300
2301         dump = talloc_zero(tmp_ctx, TDB_DATA);
2302         if (dump == NULL) {
2303                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2304                 talloc_free(tmp_ctx);
2305                 return;
2306         }
2307         ret = ctdb_dump_memory(ctdb, dump);
2308         if (ret != 0) {
2309                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2310                 talloc_free(tmp_ctx);
2311                 return;
2312         }
2313
2314 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2315
2316         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2317         if (ret != 0) {
2318                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2319                 talloc_free(tmp_ctx);
2320                 return;
2321         }
2322
2323         talloc_free(tmp_ctx);
2324 }
2325
2326 /*
2327   handler for getlog
2328 */
2329 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2330                            TDB_DATA data, void *private_data)
2331 {
2332         struct ctdb_get_log_addr *log_addr;
2333         pid_t child;
2334
2335         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2336                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2337                 return;
2338         }
2339         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2340
2341         child = ctdb_fork_no_free_ringbuffer(ctdb);
2342         if (child == (pid_t)-1) {
2343                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2344                 return;
2345         }
2346
2347         if (child == 0) {
2348                 ctdb_set_process_name("ctdb_rec_log_collector");
2349                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2350                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2351                         _exit(1);
2352                 }
2353                 ctdb_collect_log(ctdb, log_addr);
2354                 _exit(0);
2355         }
2356 }
2357
2358 /*
2359   handler for clearlog
2360 */
2361 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2362                              TDB_DATA data, void *private_data)
2363 {
2364         ctdb_clear_log(ctdb);
2365 }
2366
2367 /*
2368   handler for reload_nodes
2369 */
2370 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2371                              TDB_DATA data, void *private_data)
2372 {
2373         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2374
2375         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2376
2377         ctdb_load_nodes_file(rec->ctdb);
2378 }
2379
2380
2381 static void ctdb_rebalance_timeout(struct event_context *ev,
2382                                    struct timed_event *te,
2383                                    struct timeval t, void *p)
2384 {
2385         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2386
2387         if (rec->force_rebalance_nodes == NULL) {
2388                 DEBUG(DEBUG_ERR,
2389                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2390                 return;
2391         }
2392
2393         DEBUG(DEBUG_NOTICE,
2394               ("Rebalance timeout occurred - do takeover run\n"));
2395         do_takeover_run(rec, rec->nodemap, false);
2396 }
2397
2398         
2399 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2400                                         uint64_t srvid,
2401                                         TDB_DATA data, void *private_data)
2402 {
2403         uint32_t pnn;
2404         uint32_t *t;
2405         int len;
2406         uint32_t deferred_rebalance;
2407         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2408
2409         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2410                 return;
2411         }
2412
2413         if (data.dsize != sizeof(uint32_t)) {
2414                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2415                 return;
2416         }
2417
2418         pnn = *(uint32_t *)&data.dptr[0];
2419
2420         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2421
2422         /* Copy any existing list of nodes.  There's probably some
2423          * sort of realloc variant that will do this but we need to
2424          * make sure that freeing the old array also cancels the timer
2425          * event for the timeout... not sure if realloc will do that.
2426          */
2427         len = (rec->force_rebalance_nodes != NULL) ?
2428                 talloc_array_length(rec->force_rebalance_nodes) :
2429                 0;
2430
2431         /* This allows duplicates to be added but they don't cause
2432          * harm.  A call to add a duplicate PNN arguably means that
2433          * the timeout should be reset, so this is the simplest
2434          * solution.
2435          */
2436         t = talloc_zero_array(rec, uint32_t, len+1);
2437         CTDB_NO_MEMORY_VOID(ctdb, t);
2438         if (len > 0) {
2439                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2440         }
2441         t[len] = pnn;
2442
2443         talloc_free(rec->force_rebalance_nodes);
2444
2445         rec->force_rebalance_nodes = t;
2446
2447         /* If configured, setup a deferred takeover run to make sure
2448          * that certain nodes get IPs rebalanced to them.  This will
2449          * be cancelled if a successful takeover run happens before
2450          * the timeout.  Assign tunable value to variable for
2451          * readability.
2452          */
2453         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2454         if (deferred_rebalance != 0) {
2455                 event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2456                                 timeval_current_ofs(deferred_rebalance, 0),
2457                                 ctdb_rebalance_timeout, rec);
2458         }
2459 }
2460
2461
2462
2463 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2464                              TDB_DATA data, void *private_data)
2465 {
2466         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2467         struct ctdb_public_ip *ip;
2468
2469         if (rec->recmaster != rec->ctdb->pnn) {
2470                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2471                 return;
2472         }
2473
2474         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2475                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2476                 return;
2477         }
2478
2479         ip = (struct ctdb_public_ip *)data.dptr;
2480
2481         update_ip_assignment_tree(rec->ctdb, ip);
2482 }
2483
2484
2485 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2486 {
2487         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2488 }
2489
2490 static void reenable_takeover_runs(struct event_context *ev,
2491                                    struct timed_event *te,
2492                                    struct timeval yt, void *p)
2493 {
2494         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2495
2496         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2497         clear_takeover_runs_disable(rec);
2498 }
2499
2500 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2501                                           uint64_t srvid, TDB_DATA data,
2502                                           void *private_data)
2503 {
2504         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2505                                                     struct ctdb_recoverd);
2506         struct srvid_request_data *r;
2507         uint32_t timeout;
2508         TDB_DATA result;
2509         int32_t ret = 0;
2510
2511         /* Validate input data */
2512         if (data.dsize != sizeof(struct srvid_request_data)) {
2513                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2514                                  "expecting %lu\n", (long unsigned)data.dsize,
2515                                  (long unsigned)sizeof(struct srvid_request)));
2516                 return;
2517         }
2518         if (data.dptr == NULL) {
2519                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2520                 return;
2521         }
2522
2523         r = (struct srvid_request_data *)data.dptr;
2524         timeout = r->data;
2525
2526         if (timeout == 0) {
2527                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2528                 clear_takeover_runs_disable(rec);
2529                 ret = ctdb_get_pnn(ctdb);
2530                 goto done;
2531         }
2532
2533         if (rec->takeover_run_in_progress) {
2534                 DEBUG(DEBUG_ERR,
2535                       ("Unable to disable takeover runs - in progress\n"));
2536                 ret = -EAGAIN;
2537                 goto done;
2538         }
2539
2540         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2541
2542         /* Clear any old timers */
2543         clear_takeover_runs_disable(rec);
2544
2545         /* When this is non-NULL it indicates that takeover runs are
2546          * disabled.  This context also holds the timeout timer.
2547          */
2548         rec->takeover_runs_disable_ctx = talloc_new(rec);
2549         if (rec->takeover_runs_disable_ctx == NULL) {
2550                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2551                 ret = -ENOMEM;
2552                 goto done;
2553         }
2554
2555         /* Arrange for the timeout to occur */
2556         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2557                         timeval_current_ofs(timeout, 0),
2558                         reenable_takeover_runs,
2559                         rec);
2560
2561         /* Returning our PNN tells the caller that we succeeded */
2562         ret = ctdb_get_pnn(ctdb);
2563 done:
2564         result.dsize = sizeof(int32_t);
2565         result.dptr  = (uint8_t *)&ret;
2566         srvid_request_reply(ctdb, (struct srvid_request *)r, result);
2567 }
2568
2569 /* Backward compatibility for this SRVID - call
2570  * disable_takeover_runs_handler() instead
2571  */
2572 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2573                                      TDB_DATA data, void *private_data)
2574 {
2575         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2576                                                     struct ctdb_recoverd);
2577         TDB_DATA data2;
2578         struct srvid_request_data *req;
2579
2580         if (data.dsize != sizeof(uint32_t)) {
2581                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2582                                  "expecting %lu\n", (long unsigned)data.dsize,
2583                                  (long unsigned)sizeof(uint32_t)));
2584                 return;
2585         }
2586         if (data.dptr == NULL) {
2587                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2588                 return;
2589         }
2590
2591         req = talloc(ctdb, struct srvid_request_data);
2592         CTDB_NO_MEMORY_VOID(ctdb, req);
2593
2594         req->srvid = 0; /* No reply */
2595         req->pnn = -1;
2596         req->data = *((uint32_t *)data.dptr); /* Timeout */
2597
2598         data2.dsize = sizeof(*req);
2599         data2.dptr = (uint8_t *)req;
2600
2601         disable_takeover_runs_handler(rec->ctdb,
2602                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2603                                       data2, rec);
2604 }
2605
2606 /*
2607   handler for ip reallocate, just add it to the list of requests and 
2608   handle this later in the monitor_cluster loop so we do not recurse
2609   with other requests to takeover_run()
2610 */
2611 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2612                                   TDB_DATA data, void *private_data)
2613 {
2614         struct srvid_request *request;
2615         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2616                                                     struct ctdb_recoverd);
2617
2618         if (data.dsize != sizeof(struct srvid_request)) {
2619                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2620                 return;
2621         }
2622
2623         request = (struct srvid_request *)data.dptr;
2624
2625         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2626 }
2627
2628 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2629                                           struct ctdb_recoverd *rec)
2630 {
2631         TDB_DATA result;
2632         int32_t ret;
2633         uint32_t culprit;
2634         struct srvid_requests *current;
2635
2636         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2637
2638         /* Only process requests that are currently pending.  More
2639          * might come in while the takeover run is in progress and
2640          * they will need to be processed later since they might
2641          * be in response flag changes.
2642          */
2643         current = rec->reallocate_requests;
2644         rec->reallocate_requests = NULL;
2645
2646         /* update the list of public ips that a node can handle for
2647            all connected nodes
2648         */
2649         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2650         if (ret != 0) {
2651                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2652                                  culprit));
2653                 rec->need_takeover_run = true;
2654         }
2655         if (ret == 0) {
2656                 if (do_takeover_run(rec, rec->nodemap, false)) {
2657                         ret = ctdb_get_pnn(ctdb);
2658                 } else {
2659                         ret = -1;
2660                 }
2661         }
2662
2663         result.dsize = sizeof(int32_t);
2664         result.dptr  = (uint8_t *)&ret;
2665
2666         srvid_requests_reply(ctdb, &current, result);
2667 }
2668
2669
2670 /*
2671   handler for recovery master elections
2672 */
2673 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2674                              TDB_DATA data, void *private_data)
2675 {
2676         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2677         int ret;
2678         struct election_message *em = (struct election_message *)data.dptr;
2679         TALLOC_CTX *mem_ctx;
2680
2681         /* Ignore election packets from ourself */
2682         if (ctdb->pnn == em->pnn) {
2683                 return;
2684         }
2685
2686         /* we got an election packet - update the timeout for the election */
2687         talloc_free(rec->election_timeout);
2688         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2689                                                 fast_start ?
2690                                                 timeval_current_ofs(0, 500000) :
2691                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2692                                                 ctdb_election_timeout, rec);
2693
2694         mem_ctx = talloc_new(ctdb);
2695
2696         /* someone called an election. check their election data
2697            and if we disagree and we would rather be the elected node, 
2698            send a new election message to all other nodes
2699          */
2700         if (ctdb_election_win(rec, em)) {
2701                 if (!rec->send_election_te) {
2702                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2703                                                                 timeval_current_ofs(0, 500000),
2704                                                                 election_send_request, rec);
2705                 }
2706                 talloc_free(mem_ctx);
2707                 /*unban_all_nodes(ctdb);*/
2708                 return;
2709         }
2710         
2711         /* we didn't win */
2712         talloc_free(rec->send_election_te);
2713         rec->send_election_te = NULL;
2714
2715         if (ctdb->tunable.verify_recovery_lock != 0) {
2716                 /* release the recmaster lock */
2717                 if (em->pnn != ctdb->pnn &&
2718                     ctdb->recovery_lock_fd != -1) {
2719                         close(ctdb->recovery_lock_fd);
2720                         ctdb->recovery_lock_fd = -1;
2721                         unban_all_nodes(ctdb);
2722                 }
2723         }
2724
2725         /* ok, let that guy become recmaster then */
2726         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2727         if (ret != 0) {
2728                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2729                 talloc_free(mem_ctx);
2730                 return;
2731         }
2732
2733         talloc_free(mem_ctx);
2734         return;
2735 }
2736
2737
2738 /*
2739   force the start of the election process
2740  */
2741 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2742                            struct ctdb_node_map *nodemap)
2743 {
2744         int ret;
2745         struct ctdb_context *ctdb = rec->ctdb;
2746
2747         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2748
2749         /* set all nodes to recovery mode to stop all internode traffic */
2750         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2751         if (ret != 0) {
2752                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2753                 return;
2754         }
2755
2756         talloc_free(rec->election_timeout);
2757         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2758                                                 fast_start ?
2759                                                 timeval_current_ofs(0, 500000) :
2760                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2761                                                 ctdb_election_timeout, rec);
2762
2763         ret = send_election_request(rec, pnn);
2764         if (ret!=0) {
2765                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2766                 return;
2767         }
2768
2769         /* wait for a few seconds to collect all responses */
2770         ctdb_wait_election(rec);
2771 }
2772
2773
2774
2775 /*
2776   handler for when a node changes its flags
2777 */
2778 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2779                             TDB_DATA data, void *private_data)
2780 {
2781         int ret;
2782         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2783         struct ctdb_node_map *nodemap=NULL;
2784         TALLOC_CTX *tmp_ctx;
2785         int i;
2786         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2787         int disabled_flag_changed;
2788
2789         if (data.dsize != sizeof(*c)) {
2790                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2791                 return;
2792         }
2793
2794         tmp_ctx = talloc_new(ctdb);
2795         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2796
2797         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2798         if (ret != 0) {
2799                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2800                 talloc_free(tmp_ctx);
2801                 return;         
2802         }
2803
2804
2805         for (i=0;i<nodemap->num;i++) {
2806                 if (nodemap->nodes[i].pnn == c->pnn) break;
2807         }
2808
2809         if (i == nodemap->num) {
2810                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2811                 talloc_free(tmp_ctx);
2812                 return;
2813         }
2814
2815         if (c->old_flags != c->new_flags) {
2816                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2817         }
2818
2819         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2820
2821         nodemap->nodes[i].flags = c->new_flags;
2822
2823         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2824                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2825
2826         if (ret == 0) {
2827                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2828                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2829         }
2830         
2831         if (ret == 0 &&
2832             ctdb->recovery_master == ctdb->pnn &&
2833             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2834                 /* Only do the takeover run if the perm disabled or unhealthy
2835                    flags changed since these will cause an ip failover but not
2836                    a recovery.
2837                    If the node became disconnected or banned this will also
2838                    lead to an ip address failover but that is handled 
2839                    during recovery
2840                 */
2841                 if (disabled_flag_changed) {
2842                         rec->need_takeover_run = true;
2843                 }
2844         }
2845
2846         talloc_free(tmp_ctx);
2847 }
2848
2849 /*
2850   handler for when we need to push out flag changes ot all other nodes
2851 */
2852 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2853                             TDB_DATA data, void *private_data)
2854 {
2855         int ret;
2856         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2857         struct ctdb_node_map *nodemap=NULL;
2858         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2859         uint32_t recmaster;
2860         uint32_t *nodes;
2861
2862         /* find the recovery master */
2863         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2864         if (ret != 0) {
2865                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2866                 talloc_free(tmp_ctx);
2867                 return;
2868         }
2869
2870         /* read the node flags from the recmaster */
2871         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2872         if (ret != 0) {
2873                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2874                 talloc_free(tmp_ctx);
2875                 return;
2876         }
2877         if (c->pnn >= nodemap->num) {
2878                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2879                 talloc_free(tmp_ctx);
2880                 return;
2881         }
2882
2883         /* send the flags update to all connected nodes */
2884         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2885
2886         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2887                                       nodes, 0, CONTROL_TIMEOUT(),
2888                                       false, data,
2889                                       NULL, NULL,
2890                                       NULL) != 0) {
2891                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2892
2893                 talloc_free(tmp_ctx);
2894                 return;
2895         }
2896
2897         talloc_free(tmp_ctx);
2898 }
2899
2900
2901 struct verify_recmode_normal_data {
2902         uint32_t count;
2903         enum monitor_result status;
2904 };
2905
2906 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2907 {
2908         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2909
2910
2911         /* one more node has responded with recmode data*/
2912         rmdata->count--;
2913
2914         /* if we failed to get the recmode, then return an error and let
2915            the main loop try again.
2916         */
2917         if (state->state != CTDB_CONTROL_DONE) {
2918                 if (rmdata->status == MONITOR_OK) {
2919                         rmdata->status = MONITOR_FAILED;
2920                 }
2921                 return;
2922         }
2923
2924         /* if we got a response, then the recmode will be stored in the
2925            status field
2926         */
2927         if (state->status != CTDB_RECOVERY_NORMAL) {
2928                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2929                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2930         }
2931
2932         return;
2933 }
2934
2935
2936 /* verify that all nodes are in normal recovery mode */
2937 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2938 {
2939         struct verify_recmode_normal_data *rmdata;
2940         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2941         struct ctdb_client_control_state *state;
2942         enum monitor_result status;
2943         int j;
2944         
2945         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2946         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2947         rmdata->count  = 0;
2948         rmdata->status = MONITOR_OK;
2949
2950         /* loop over all active nodes and send an async getrecmode call to 
2951            them*/
2952         for (j=0; j<nodemap->num; j++) {
2953                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2954                         continue;
2955                 }
2956                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2957                                         CONTROL_TIMEOUT(), 
2958                                         nodemap->nodes[j].pnn);
2959                 if (state == NULL) {
2960                         /* we failed to send the control, treat this as 
2961                            an error and try again next iteration
2962                         */                      
2963                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2964                         talloc_free(mem_ctx);
2965                         return MONITOR_FAILED;
2966                 }
2967
2968                 /* set up the callback functions */
2969                 state->async.fn = verify_recmode_normal_callback;
2970                 state->async.private_data = rmdata;
2971
2972                 /* one more control to wait for to complete */
2973                 rmdata->count++;
2974         }
2975
2976
2977         /* now wait for up to the maximum number of seconds allowed
2978            or until all nodes we expect a response from has replied
2979         */
2980         while (rmdata->count > 0) {
2981                 event_loop_once(ctdb->ev);
2982         }
2983
2984         status = rmdata->status;
2985         talloc_free(mem_ctx);
2986         return status;
2987 }
2988
2989
2990 struct verify_recmaster_data {
2991         struct ctdb_recoverd *rec;
2992         uint32_t count;
2993         uint32_t pnn;
2994         enum monitor_result status;
2995 };
2996
2997 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2998 {
2999         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3000
3001
3002         /* one more node has responded with recmaster data*/
3003         rmdata->count--;
3004
3005         /* if we failed to get the recmaster, then return an error and let
3006            the main loop try again.
3007         */
3008         if (state->state != CTDB_CONTROL_DONE) {
3009                 if (rmdata->status == MONITOR_OK) {
3010                         rmdata->status = MONITOR_FAILED;
3011                 }
3012                 return;
3013         }
3014
3015         /* if we got a response, then the recmaster will be stored in the
3016            status field
3017         */
3018         if (state->status != rmdata->pnn) {
3019                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3020                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3021                 rmdata->status = MONITOR_ELECTION_NEEDED;
3022         }
3023
3024         return;
3025 }
3026
3027
3028 /* verify that all nodes agree that we are the recmaster */
3029 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
3030 {
3031         struct ctdb_context *ctdb = rec->ctdb;
3032         struct verify_recmaster_data *rmdata;
3033         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3034         struct ctdb_client_control_state *state;
3035         enum monitor_result status;
3036         int j;
3037         
3038         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3039         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3040         rmdata->rec    = rec;
3041         rmdata->count  = 0;
3042         rmdata->pnn    = pnn;
3043         rmdata->status = MONITOR_OK;
3044
3045         /* loop over all active nodes and send an async getrecmaster call to 
3046            them*/
3047         for (j=0; j<nodemap->num; j++) {
3048                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3049                         continue;
3050                 }
3051                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3052                                         CONTROL_TIMEOUT(),
3053                                         nodemap->nodes[j].pnn);
3054                 if (state == NULL) {
3055                         /* we failed to send the control, treat this as 
3056                            an error and try again next iteration
3057                         */                      
3058                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3059                         talloc_free(mem_ctx);
3060                         return MONITOR_FAILED;
3061                 }
3062
3063                 /* set up the callback functions */
3064                 state->async.fn = verify_recmaster_callback;
3065                 state->async.private_data = rmdata;
3066
3067                 /* one more control to wait for to complete */
3068                 rmdata->count++;
3069         }
3070
3071
3072         /* now wait for up to the maximum number of seconds allowed
3073            or until all nodes we expect a response from has replied
3074         */
3075         while (rmdata->count > 0) {
3076                 event_loop_once(ctdb->ev);
3077         }
3078
3079         status = rmdata->status;
3080         talloc_free(mem_ctx);
3081         return status;
3082 }
3083
3084 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3085                                     struct ctdb_recoverd *rec)
3086 {
3087         struct ctdb_control_get_ifaces *ifaces = NULL;
3088         TALLOC_CTX *mem_ctx;
3089         bool ret = false;
3090
3091         mem_ctx = talloc_new(NULL);
3092
3093         /* Read the interfaces from the local node */
3094         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3095                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3096                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3097                 /* We could return an error.  However, this will be
3098                  * rare so we'll decide that the interfaces have
3099                  * actually changed, just in case.
3100                  */
3101                 talloc_free(mem_ctx);
3102                 return true;
3103         }
3104
3105         if (!rec->ifaces) {
3106                 /* We haven't been here before so things have changed */
3107                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3108                 ret = true;
3109         } else if (rec->ifaces->num != ifaces->num) {
3110                 /* Number of interfaces has changed */
3111                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3112                                      rec->ifaces->num, ifaces->num));
3113                 ret = true;
3114         } else {
3115                 /* See if interface names or link states have changed */
3116                 int i;
3117                 for (i = 0; i < rec->ifaces->num; i++) {
3118                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3119                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3120                                 DEBUG(DEBUG_NOTICE,
3121                                       ("Interface in slot %d changed: %s => %s\n",
3122                                        i, iface->name, ifaces->ifaces[i].name));
3123                                 ret = true;
3124                                 break;
3125                         }
3126                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3127                                 DEBUG(DEBUG_NOTICE,
3128                                       ("Interface %s changed state: %d => %d\n",
3129                                        iface->name, iface->link_state,
3130                                        ifaces->ifaces[i].link_state));
3131                                 ret = true;
3132                                 break;
3133                         }
3134                 }
3135         }
3136
3137         talloc_free(rec->ifaces);
3138         rec->ifaces = talloc_steal(rec, ifaces);
3139
3140         talloc_free(mem_ctx);
3141         return ret;
3142 }
3143
3144 /* called to check that the local allocation of public ip addresses is ok.
3145 */
3146 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3147 {
3148         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3149         struct ctdb_uptime *uptime1 = NULL;
3150         struct ctdb_uptime *uptime2 = NULL;
3151         int ret, j;
3152         bool need_takeover_run = false;
3153
3154         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3155                                 CTDB_CURRENT_NODE, &uptime1);
3156         if (ret != 0) {
3157                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3158                 talloc_free(mem_ctx);
3159                 return -1;
3160         }
3161
3162         if (interfaces_have_changed(ctdb, rec)) {
3163                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3164                                      "local node %u - force takeover run\n",
3165                                      pnn));
3166                 need_takeover_run = true;
3167         }
3168
3169         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3170                                 CTDB_CURRENT_NODE, &uptime2);
3171         if (ret != 0) {
3172                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3173                 talloc_free(mem_ctx);
3174                 return -1;
3175         }
3176
3177         /* skip the check if the startrecovery time has changed */
3178         if (timeval_compare(&uptime1->last_recovery_started,
3179                             &uptime2->last_recovery_started) != 0) {
3180                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3181                 talloc_free(mem_ctx);
3182                 return 0;
3183         }
3184
3185         /* skip the check if the endrecovery time has changed */
3186         if (timeval_compare(&uptime1->last_recovery_finished,
3187                             &uptime2->last_recovery_finished) != 0) {
3188                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3189                 talloc_free(mem_ctx);
3190                 return 0;
3191         }
3192
3193         /* skip the check if we have started but not finished recovery */
3194         if (timeval_compare(&uptime1->last_recovery_finished,
3195                             &uptime1->last_recovery_started) != 1) {
3196                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3197                 talloc_free(mem_ctx);
3198
3199                 return 0;
3200         }
3201
3202         /* verify that we have the ip addresses we should have
3203            and we dont have ones we shouldnt have.
3204            if we find an inconsistency we set recmode to
3205            active on the local node and wait for the recmaster
3206            to do a full blown recovery.
3207            also if the pnn is -1 and we are healthy and can host the ip
3208            we also request a ip reallocation.
3209         */
3210         if (ctdb->tunable.disable_ip_failover == 0) {
3211                 struct ctdb_all_public_ips *ips = NULL;
3212
3213                 /* read the *available* IPs from the local node */
3214                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3215                 if (ret != 0) {
3216                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3217                         talloc_free(mem_ctx);
3218                         return -1;
3219                 }
3220
3221                 for (j=0; j<ips->num; j++) {
3222                         if (ips->ips[j].pnn == -1 &&
3223                             nodemap->nodes[pnn].flags == 0) {
3224                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3225                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3226                                 need_takeover_run = true;
3227                         }
3228                 }
3229
3230                 talloc_free(ips);
3231
3232                 /* read the *known* IPs from the local node */
3233                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3234                 if (ret != 0) {
3235                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3236                         talloc_free(mem_ctx);
3237                         return -1;
3238                 }
3239
3240                 for (j=0; j<ips->num; j++) {
3241                         if (ips->ips[j].pnn == pnn) {
3242                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3243                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3244                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3245                                         need_takeover_run = true;
3246                                 }
3247                         } else {
3248                                 if (ctdb->do_checkpublicip &&
3249                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3250
3251                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3252                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3253
3254                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3255                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3256                                         }
3257                                 }
3258                         }
3259                 }
3260         }
3261
3262         if (need_takeover_run) {
3263                 struct srvid_request rd;
3264                 TDB_DATA data;
3265
3266                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3267
3268                 rd.pnn = ctdb->pnn;
3269                 rd.srvid = 0;
3270                 data.dptr = (uint8_t *)&rd;
3271                 data.dsize = sizeof(rd);
3272
3273                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3274                 if (ret != 0) {
3275                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3276                 }
3277         }
3278         talloc_free(mem_ctx);
3279         return 0;
3280 }
3281
3282
3283 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3284 {
3285         struct ctdb_node_map **remote_nodemaps = callback_data;
3286
3287         if (node_pnn >= ctdb->num_nodes) {
3288                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3289                 return;
3290         }
3291
3292         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3293
3294 }
3295
3296 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3297         struct ctdb_node_map *nodemap,
3298         struct ctdb_node_map **remote_nodemaps)
3299 {
3300         uint32_t *nodes;
3301
3302         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3303         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3304                                         nodes, 0,
3305                                         CONTROL_TIMEOUT(), false, tdb_null,
3306                                         async_getnodemap_callback,
3307                                         NULL,
3308                                         remote_nodemaps) != 0) {
3309                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3310
3311                 return -1;
3312         }
3313
3314         return 0;
3315 }
3316
3317 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3318 struct ctdb_check_reclock_state {
3319         struct ctdb_context *ctdb;
3320         struct timeval start_time;
3321         int fd[2];
3322         pid_t child;
3323         struct timed_event *te;
3324         struct fd_event *fde;
3325         enum reclock_child_status status;
3326 };
3327
3328 /* when we free the reclock state we must kill any child process.
3329 */
3330 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3331 {
3332         struct ctdb_context *ctdb = state->ctdb;
3333
3334         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3335
3336         if (state->fd[0] != -1) {
3337                 close(state->fd[0]);
3338                 state->fd[0] = -1;
3339         }
3340         if (state->fd[1] != -1) {
3341                 close(state->fd[1]);
3342                 state->fd[1] = -1;
3343         }
3344         ctdb_kill(ctdb, state->child, SIGKILL);
3345         return 0;
3346 }
3347
3348 /*
3349   called if our check_reclock child times out. this would happen if
3350   i/o to the reclock file blocks.
3351  */
3352 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3353                                          struct timeval t, void *private_data)
3354 {
3355         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3356                                            struct ctdb_check_reclock_state);
3357
3358         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3359         state->status = RECLOCK_TIMEOUT;
3360 }
3361
3362 /* this is called when the child process has completed checking the reclock
3363    file and has written data back to us through the pipe.
3364 */
3365 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3366                              uint16_t flags, void *private_data)
3367 {
3368         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3369                                              struct ctdb_check_reclock_state);
3370         char c = 0;
3371         int ret;
3372
3373         /* we got a response from our child process so we can abort the
3374            timeout.
3375         */
3376         talloc_free(state->te);
3377         state->te = NULL;
3378
3379         ret = read(state->fd[0], &c, 1);
3380         if (ret != 1 || c != RECLOCK_OK) {
3381                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3382                 state->status = RECLOCK_FAILED;
3383
3384                 return;
3385         }
3386
3387         state->status = RECLOCK_OK;
3388         return;
3389 }
3390
3391 static int check_recovery_lock(struct ctdb_context *ctdb)
3392 {
3393         int ret;
3394         struct ctdb_check_reclock_state *state;
3395         pid_t parent = getpid();
3396
3397         if (ctdb->recovery_lock_fd == -1) {
3398                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3399                 return -1;
3400         }
3401
3402         state = talloc(ctdb, struct ctdb_check_reclock_state);
3403         CTDB_NO_MEMORY(ctdb, state);
3404
3405         state->ctdb = ctdb;
3406         state->start_time = timeval_current();
3407         state->status = RECLOCK_CHECKING;
3408         state->fd[0] = -1;
3409         state->fd[1] = -1;
3410
3411         ret = pipe(state->fd);
3412         if (ret != 0) {
3413                 talloc_free(state);
3414                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3415                 return -1;
3416         }
3417
3418         state->child = ctdb_fork(ctdb);
3419         if (state->child == (pid_t)-1) {
3420                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3421                 close(state->fd[0]);
3422                 state->fd[0] = -1;
3423                 close(state->fd[1]);
3424                 state->fd[1] = -1;
3425                 talloc_free(state);
3426                 return -1;
3427         }
3428
3429         if (state->child == 0) {
3430                 char cc = RECLOCK_OK;
3431                 close(state->fd[0]);
3432                 state->fd[0] = -1;
3433
3434                 ctdb_set_process_name("ctdb_rec_reclock");
3435                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3436                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3437                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3438                         cc = RECLOCK_FAILED;
3439                 }
3440
3441                 write(state->fd[1], &cc, 1);
3442                 /* make sure we die when our parent dies */
3443                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3444                         sleep(5);
3445                 }
3446                 _exit(0);
3447         }
3448         close(state->fd[1]);
3449         state->fd[1] = -1;
3450         set_close_on_exec(state->fd[0]);
3451
3452         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3453
3454         talloc_set_destructor(state, check_reclock_destructor);
3455
3456         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3457                                     ctdb_check_reclock_timeout, state);
3458         if (state->te == NULL) {
3459                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3460                 talloc_free(state);
3461                 return -1;
3462         }
3463
3464         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3465                                 EVENT_FD_READ,
3466                                 reclock_child_handler,
3467                                 (void *)state);
3468
3469         if (state->fde == NULL) {
3470                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3471                 talloc_free(state);
3472                 return -1;
3473         }
3474         tevent_fd_set_auto_close(state->fde);
3475
3476         while (state->status == RECLOCK_CHECKING) {
3477                 event_loop_once(ctdb->ev);
3478         }
3479
3480         if (state->status == RECLOCK_FAILED) {
3481                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3482                 close(ctdb->recovery_lock_fd);
3483                 ctdb->recovery_lock_fd = -1;
3484                 talloc_free(state);
3485                 return -1;
3486         }
3487
3488         talloc_free(state);
3489         return 0;
3490 }
3491
3492 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3493 {
3494         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3495         const char *reclockfile;
3496
3497         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3498                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3499                 talloc_free(tmp_ctx);
3500                 return -1;      
3501         }
3502
3503         if (reclockfile == NULL) {
3504                 if (ctdb->recovery_lock_file != NULL) {
3505                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3506                         talloc_free(ctdb->recovery_lock_file);
3507                         ctdb->recovery_lock_file = NULL;
3508                         if (ctdb->recovery_lock_fd != -1) {
3509                                 close(ctdb->recovery_lock_fd);
3510                                 ctdb->recovery_lock_fd = -1;
3511                         }
3512                 }
3513                 ctdb->tunable.verify_recovery_lock = 0;
3514                 talloc_free(tmp_ctx);
3515                 return 0;
3516         }
3517
3518         if (ctdb->recovery_lock_file == NULL) {
3519                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3520                 if (ctdb->recovery_lock_fd != -1) {
3521                         close(ctdb->recovery_lock_fd);
3522                         ctdb->recovery_lock_fd = -1;
3523                 }
3524                 talloc_free(tmp_ctx);
3525                 return 0;
3526         }
3527
3528
3529         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3530                 talloc_free(tmp_ctx);
3531                 return 0;
3532         }
3533
3534         talloc_free(ctdb->recovery_lock_file);
3535         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3536         ctdb->tunable.verify_recovery_lock = 0;
3537         if (ctdb->recovery_lock_fd != -1) {
3538                 close(ctdb->recovery_lock_fd);
3539                 ctdb->recovery_lock_fd = -1;
3540         }
3541
3542         talloc_free(tmp_ctx);
3543         return 0;
3544 }
3545
3546 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3547                       TALLOC_CTX *mem_ctx)
3548 {
3549         uint32_t pnn;
3550         struct ctdb_node_map *nodemap=NULL;
3551         struct ctdb_node_map *recmaster_nodemap=NULL;
3552         struct ctdb_node_map **remote_nodemaps=NULL;
3553         struct ctdb_vnn_map *vnnmap=NULL;
3554         struct ctdb_vnn_map *remote_vnnmap=NULL;
3555         int32_t debug_level;
3556         int i, j, ret;
3557         bool self_ban;
3558
3559
3560         /* verify that the main daemon is still running */
3561         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3562                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3563                 exit(-1);
3564         }
3565
3566         /* ping the local daemon to tell it we are alive */
3567         ctdb_ctrl_recd_ping(ctdb);
3568
3569         if (rec->election_timeout) {
3570                 /* an election is in progress */
3571                 return;
3572         }
3573
3574         /* read the debug level from the parent and update locally */
3575         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3576         if (ret !=0) {
3577                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3578                 return;
3579         }
3580         LogLevel = debug_level;
3581
3582         /* get relevant tunables */
3583         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3584         if (ret != 0) {
3585                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3586                 return;
3587         }
3588
3589         /* get the current recovery lock file from the server */
3590         if (update_recovery_lock_file(ctdb) != 0) {
3591                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3592                 return;
3593         }
3594
3595         /* Make sure that if recovery lock verification becomes disabled when
3596            we close the file
3597         */
3598         if (ctdb->tunable.verify_recovery_lock == 0) {
3599                 if (ctdb->recovery_lock_fd != -1) {
3600                         close(ctdb->recovery_lock_fd);
3601                         ctdb->recovery_lock_fd = -1;
3602                 }
3603         }
3604
3605         pnn = ctdb_get_pnn(ctdb);
3606
3607         /* get the vnnmap */
3608         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3609         if (ret != 0) {
3610                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3611                 return;
3612         }
3613
3614
3615         /* get number of nodes */
3616         if (rec->nodemap) {
3617                 talloc_free(rec->nodemap);
3618                 rec->nodemap = NULL;
3619                 nodemap=NULL;
3620         }
3621         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3622         if (ret != 0) {
3623                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3624                 return;
3625         }
3626         nodemap = rec->nodemap;
3627
3628         /* remember our own node flags */
3629         rec->node_flags = nodemap->nodes[pnn].flags;
3630
3631         ban_misbehaving_nodes(rec, &self_ban);
3632         if (self_ban) {
3633                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3634                 return;
3635         }
3636
3637         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3638            also frozen and that the recmode is set to active.
3639         */
3640         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3641                 /* If this node has become inactive then we want to
3642                  * reduce the chances of it taking over the recovery
3643                  * master role when it becomes active again.  This
3644                  * helps to stabilise the recovery master role so that
3645                  * it stays on the most stable node.
3646                  */
3647                 rec->priority_time = timeval_current();
3648
3649                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3650                 if (ret != 0) {
3651                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3652                 }
3653                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3654                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3655
3656                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3657                         if (ret != 0) {
3658                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3659                                 return;
3660                         }
3661                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3662                         if (ret != 0) {
3663                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3664
3665                                 return;
3666                         }
3667                 }
3668
3669                 /* If this node is stopped or banned then it is not the recovery
3670                  * master, so don't do anything. This prevents stopped or banned
3671                  * node from starting election and sending unnecessary controls.
3672                  */
3673                 return;
3674         }
3675
3676         /* check which node is the recovery master */
3677         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3678         if (ret != 0) {
3679                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3680                 return;
3681         }
3682
3683         /* If we are not the recmaster then do some housekeeping */
3684         if (rec->recmaster != pnn) {
3685                 /* Ignore any IP reallocate requests - only recmaster
3686                  * processes them
3687                  */
3688                 TALLOC_FREE(rec->reallocate_requests);
3689                 /* Clear any nodes that should be force rebalanced in
3690                  * the next takeover run.  If the recovery master role
3691                  * has moved then we don't want to process these some
3692                  * time in the future.
3693                  */
3694                 TALLOC_FREE(rec->force_rebalance_nodes);
3695         }
3696
3697         /* This is a special case.  When recovery daemon is started, recmaster
3698          * is set to -1.  If a node is not started in stopped state, then
3699          * start election to decide recovery master
3700          */
3701         if (rec->recmaster == (uint32_t)-1) {
3702                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3703                 force_election(rec, pnn, nodemap);
3704                 return;
3705         }
3706
3707         /* update the capabilities for all nodes */
3708         ret = update_capabilities(ctdb, nodemap);
3709         if (ret != 0) {
3710                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3711                 return;
3712         }
3713
3714         /*
3715          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3716          * but we have, then force an election and try to become the new
3717          * recmaster.
3718          */
3719         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3720             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3721              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3722                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3723                                   " but we (node %u) have - force an election\n",
3724                                   rec->recmaster, pnn));
3725                 force_election(rec, pnn, nodemap);
3726                 return;
3727         }
3728
3729         /* count how many active nodes there are */
3730         rec->num_active    = 0;
3731         rec->num_lmasters  = 0;
3732         rec->num_connected = 0;
3733         for (i=0; i<nodemap->num; i++) {
3734                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3735                         rec->num_active++;
3736                         if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
3737                                 rec->num_lmasters++;
3738                         }
3739                 }
3740                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3741                         rec->num_connected++;
3742                 }
3743         }
3744
3745
3746         /* verify that the recmaster node is still active */
3747         for (j=0; j<nodemap->num; j++) {
3748                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3749                         break;
3750                 }
3751         }
3752
3753         if (j == nodemap->num) {
3754                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3755                 force_election(rec, pnn, nodemap);
3756                 return;
3757         }
3758
3759         /* if recovery master is disconnected we must elect a new recmaster */
3760         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3761                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3762                 force_election(rec, pnn, nodemap);
3763                 return;
3764         }
3765
3766         /* get nodemap from the recovery master to check if it is inactive */
3767         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3768                                    mem_ctx, &recmaster_nodemap);
3769         if (ret != 0) {
3770                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3771                           nodemap->nodes[j].pnn));
3772                 return;
3773         }
3774
3775
3776         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3777             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3778                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3779                 /*
3780                  * update our nodemap to carry the recmaster's notion of
3781                  * its own flags, so that we don't keep freezing the
3782                  * inactive recmaster node...
3783                  */
3784                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3785                 force_election(rec, pnn, nodemap);
3786                 return;
3787         }
3788
3789         /* verify that we have all ip addresses we should have and we dont
3790          * have addresses we shouldnt have.
3791          */ 
3792         if (ctdb->tunable.disable_ip_failover == 0 &&
3793             rec->takeover_runs_disable_ctx == NULL) {
3794                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3795                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3796                 }
3797         }
3798
3799
3800         /* if we are not the recmaster then we do not need to check
3801            if recovery is needed
3802          */
3803         if (pnn != rec->recmaster) {
3804                 return;
3805         }
3806
3807
3808         /* ensure our local copies of flags are right */
3809         ret = update_local_flags(rec, nodemap);
3810         if (ret == MONITOR_ELECTION_NEEDED) {
3811                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3812                 force_election(rec, pnn, nodemap);
3813                 return;
3814         }
3815         if (ret != MONITOR_OK) {
3816                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3817                 return;
3818         }
3819
3820         if (ctdb->num_nodes != nodemap->num) {
3821                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3822                 ctdb_load_nodes_file(ctdb);
3823                 return;
3824         }
3825
3826         /* verify that all active nodes agree that we are the recmaster */
3827         switch (verify_recmaster(rec, nodemap, pnn)) {
3828         case MONITOR_RECOVERY_NEEDED:
3829                 /* can not happen */
3830                 return;
3831         case MONITOR_ELECTION_NEEDED:
3832                 force_election(rec, pnn, nodemap);
3833                 return;
3834         case MONITOR_OK:
3835                 break;
3836         case MONITOR_FAILED:
3837                 return;
3838         }
3839
3840
3841         if (rec->need_recovery) {
3842                 /* a previous recovery didn't finish */
3843                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3844                 return;
3845         }
3846
3847         /* verify that all active nodes are in normal mode 
3848            and not in recovery mode 
3849         */
3850         switch (verify_recmode(ctdb, nodemap)) {
3851         case MONITOR_RECOVERY_NEEDED:
3852                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3853                 return;
3854         case MONITOR_FAILED:
3855                 return;
3856         case MONITOR_ELECTION_NEEDED:
3857                 /* can not happen */
3858         case MONITOR_OK:
3859                 break;
3860         }
3861
3862
3863         if (ctdb->tunable.verify_recovery_lock != 0) {
3864                 /* we should have the reclock - check its not stale */
3865                 ret = check_recovery_lock(ctdb);
3866                 if (ret != 0) {
3867                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3868                         ctdb_set_culprit(rec, ctdb->pnn);
3869                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3870                         return;
3871                 }
3872         }
3873
3874
3875         /* if there are takeovers requested, perform it and notify the waiters */
3876         if (rec->takeover_runs_disable_ctx == NULL &&
3877             rec->reallocate_requests) {
3878                 process_ipreallocate_requests(ctdb, rec);
3879         }
3880
3881         /* get the nodemap for all active remote nodes
3882          */
3883         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3884         if (remote_nodemaps == NULL) {
3885                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3886                 return;
3887         }
3888         for(i=0; i<nodemap->num; i++) {
3889                 remote_nodemaps[i] = NULL;
3890         }
3891         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3892                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3893                 return;
3894         } 
3895
3896         /* verify that all other nodes have the same nodemap as we have
3897         */
3898         for (j=0; j<nodemap->num; j++) {
3899                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3900                         continue;
3901                 }
3902
3903                 if (remote_nodemaps[j] == NULL) {
3904                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3905                         ctdb_set_culprit(rec, j);
3906
3907                         return;
3908                 }
3909
3910                 /* if the nodes disagree on how many nodes there are
3911                    then this is a good reason to try recovery
3912                  */
3913                 if (remote_nodemaps[j]->num != nodemap->num) {
3914                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3915                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3916                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3917                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3918                         return;
3919                 }
3920
3921                 /* if the nodes disagree on which nodes exist and are
3922                    active, then that is also a good reason to do recovery
3923                  */
3924                 for (i=0;i<nodemap->num;i++) {
3925                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3926                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3927                                           nodemap->nodes[j].pnn, i, 
3928                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3929                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3930                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3931                                             vnnmap);
3932                                 return;
3933                         }
3934                 }
3935         }
3936
3937         /*
3938          * Update node flags obtained from each active node. This ensure we have
3939          * up-to-date information for all the nodes.
3940          */
3941         for (j=0; j<nodemap->num; j++) {
3942                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3943                         continue;
3944                 }
3945                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3946         }
3947
3948         for (j=0; j<nodemap->num; j++) {
3949                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3950                         continue;
3951                 }
3952
3953                 /* verify the flags are consistent
3954                 */
3955                 for (i=0; i<nodemap->num; i++) {
3956                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3957                                 continue;
3958                         }
3959                         
3960                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3961                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3962                                   nodemap->nodes[j].pnn, 
3963                                   nodemap->nodes[i].pnn, 
3964                                   remote_nodemaps[j]->nodes[i].flags,
3965                                   nodemap->nodes[i].flags));
3966                                 if (i == j) {
3967                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3968                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3969                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3970                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3971                                                     vnnmap);
3972                                         return;
3973                                 } else {
3974                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3975                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3976                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3977                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3978                                                     vnnmap);
3979                                         return;
3980                                 }
3981                         }
3982                 }
3983         }
3984
3985
3986         /* There must be the same number of lmasters in the vnn map as
3987          * there are active nodes with the lmaster capability...  or
3988          * do a recovery.
3989          */
3990         if (vnnmap->size != rec->num_lmasters) {
3991                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3992                           vnnmap->size, rec->num_lmasters));
3993                 ctdb_set_culprit(rec, ctdb->pnn);
3994                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3995                 return;
3996         }
3997
3998         /* verify that all active nodes in the nodemap also exist in 
3999            the vnnmap.
4000          */
4001         for (j=0; j<nodemap->num; j++) {
4002                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
4003                         continue;
4004                 }
4005                 if (nodemap->nodes[j].pnn == pnn) {
4006                         continue;
4007                 }
4008
4009                 for (i=0; i<vnnmap->size; i++) {
4010                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
4011                                 break;
4012                         }
4013                 }
4014                 if (i == vnnmap->size) {
4015                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
4016                                   nodemap->nodes[j].pnn));
4017                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4018                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4019                         return;
4020                 }
4021         }
4022
4023         
4024         /* verify that all other nodes have the same vnnmap
4025            and are from the same generation
4026          */
4027         for (j=0; j<nodemap->num; j++) {
4028                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
4029                         continue;
4030                 }
4031                 if (nodemap->nodes[j].pnn == pnn) {
4032                         continue;
4033                 }
4034
4035                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
4036                                           mem_ctx, &remote_vnnmap);
4037                 if (ret != 0) {
4038                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
4039                                   nodemap->nodes[j].pnn));
4040                         return;
4041                 }
4042
4043                 /* verify the vnnmap generation is the same */
4044                 if (vnnmap->generation != remote_vnnmap->generation) {
4045                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
4046                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
4047                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4048                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4049                         return;
4050                 }
4051
4052                 /* verify the vnnmap size is the same */
4053                 if (vnnmap->size != remote_vnnmap->size) {
4054                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4055                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4056                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4057                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4058                         return;
4059                 }
4060
4061                 /* verify the vnnmap is the same */
4062                 for (i=0;i<vnnmap->size;i++) {
4063                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4064                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4065                                           nodemap->nodes[j].pnn));
4066                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4067                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4068                                             vnnmap);
4069                                 return;
4070                         }
4071                 }
4072         }
4073
4074         /* we might need to change who has what IP assigned */
4075         if (rec->need_takeover_run) {
4076                 uint32_t culprit = (uint32_t)-1;
4077
4078                 rec->need_takeover_run = false;
4079
4080                 /* update the list of public ips that a node can handle for
4081                    all connected nodes
4082                 */
4083                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4084                 if (ret != 0) {
4085                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4086                                          culprit));
4087                         rec->need_takeover_run = true;
4088                         return;
4089                 }
4090
4091                 /* execute the "startrecovery" event script on all nodes */
4092                 ret = run_startrecovery_eventscript(rec, nodemap);
4093                 if (ret!=0) {
4094                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4095                         ctdb_set_culprit(rec, ctdb->pnn);
4096                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4097                         return;
4098                 }
4099
4100                 /* If takeover run fails, then the offending nodes are
4101                  * assigned ban culprit counts. And we re-try takeover.
4102                  * If takeover run fails repeatedly, the node would get
4103                  * banned.
4104                  *
4105                  * If rec->need_takeover_run is not set to true at this
4106                  * failure, monitoring is disabled cluster-wide (via
4107                  * startrecovery eventscript) and will not get enabled.
4108                  */
4109                 if (!do_takeover_run(rec, nodemap, true)) {
4110                         return;
4111                 }
4112
4113                 /* execute the "recovered" event script on all nodes */
4114                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4115 #if 0
4116 // we cant check whether the event completed successfully
4117 // since this script WILL fail if the node is in recovery mode
4118 // and if that race happens, the code here would just cause a second
4119 // cascading recovery.
4120                 if (ret!=0) {
4121                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4122                         ctdb_set_culprit(rec, ctdb->pnn);
4123                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4124                 }
4125 #endif
4126         }
4127 }
4128
4129 /*
4130   the main monitoring loop
4131  */
4132 static void monitor_cluster(struct ctdb_context *ctdb)
4133 {
4134         struct ctdb_recoverd *rec;
4135
4136         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4137
4138         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4139         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4140
4141         rec->ctdb = ctdb;
4142
4143         rec->takeover_run_in_progress = false;
4144
4145         rec->priority_time = timeval_current();
4146
4147         /* register a message port for sending memory dumps */
4148         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4149
4150         /* register a message port for requesting logs */
4151         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4152
4153         /* register a message port for clearing logs */
4154         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4155
4156         /* register a message port for recovery elections */
4157         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4158
4159         /* when nodes are disabled/enabled */
4160         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4161
4162         /* when we are asked to puch out a flag change */
4163         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4164
4165         /* register a message port for vacuum fetch */
4166         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4167
4168         /* register a message port for reloadnodes  */
4169         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4170
4171         /* register a message port for performing a takeover run */
4172         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4173
4174         /* register a message port for disabling the ip check for a short while */
4175         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4176
4177         /* register a message port for updating the recovery daemons node assignment for an ip */
4178         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4179
4180         /* register a message port for forcing a rebalance of a node next
4181            reallocation */
4182         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4183
4184         /* Register a message port for disabling takeover runs */
4185         ctdb_client_set_message_handler(ctdb,
4186                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4187                                         disable_takeover_runs_handler, rec);
4188
4189         /* register a message port for detaching database */
4190         ctdb_client_set_message_handler(ctdb,
4191                                         CTDB_SRVID_DETACH_DATABASE,
4192                                         detach_database_handler, rec);
4193
4194         for (;;) {
4195                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4196                 struct timeval start;
4197                 double elapsed;
4198
4199                 if (!mem_ctx) {
4200                         DEBUG(DEBUG_CRIT,(__location__
4201                                           " Failed to create temp context\n"));
4202                         exit(-1);
4203                 }
4204
4205                 start = timeval_current();
4206                 main_loop(ctdb, rec, mem_ctx);
4207                 talloc_free(mem_ctx);
4208
4209                 /* we only check for recovery once every second */
4210                 elapsed = timeval_elapsed(&start);
4211                 if (elapsed < ctdb->tunable.recover_interval) {
4212                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4213                                           - elapsed);
4214                 }
4215         }
4216 }
4217
4218 /*
4219   event handler for when the main ctdbd dies
4220  */
4221 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4222                                  uint16_t flags, void *private_data)
4223 {
4224         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4225         _exit(1);
4226 }
4227
4228 /*
4229   called regularly to verify that the recovery daemon is still running
4230  */
4231 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4232                               struct timeval yt, void *p)
4233 {
4234         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4235
4236         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4237                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4238
4239                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4240                                 ctdb_restart_recd, ctdb);
4241
4242                 return;
4243         }
4244
4245         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4246                         timeval_current_ofs(30, 0),
4247                         ctdb_check_recd, ctdb);
4248 }
4249
4250 static void recd_sig_child_handler(struct event_context *ev,
4251         struct signal_event *se, int signum, int count,
4252         void *dont_care, 
4253         void *private_data)
4254 {
4255 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4256         int status;
4257         pid_t pid = -1;
4258
4259         while (pid != 0) {
4260                 pid = waitpid(-1, &status, WNOHANG);
4261                 if (pid == -1) {
4262                         if (errno != ECHILD) {
4263                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4264                         }
4265                         return;
4266                 }
4267                 if (pid > 0) {
4268                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4269                 }
4270         }
4271 }
4272
4273 /*
4274   startup the recovery daemon as a child of the main ctdb daemon
4275  */
4276 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4277 {
4278         int fd[2];
4279         struct signal_event *se;
4280         struct tevent_fd *fde;
4281
4282         if (pipe(fd) != 0) {
4283                 return -1;
4284         }
4285
4286         ctdb->ctdbd_pid = getpid();
4287
4288         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4289         if (ctdb->recoverd_pid == -1) {
4290                 return -1;
4291         }
4292
4293         if (ctdb->recoverd_pid != 0) {
4294                 talloc_free(ctdb->recd_ctx);
4295                 ctdb->recd_ctx = talloc_new(ctdb);
4296                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4297
4298                 close(fd[0]);
4299                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4300                                 timeval_current_ofs(30, 0),
4301                                 ctdb_check_recd, ctdb);
4302                 return 0;
4303         }
4304
4305         close(fd[1]);
4306
4307         srandom(getpid() ^ time(NULL));
4308
4309         /* Clear the log ringbuffer */
4310         ctdb_clear_log(ctdb);
4311
4312         ctdb_set_process_name("ctdb_recovered");
4313         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4314                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4315                 exit(1);
4316         }
4317
4318         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4319
4320         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4321                      ctdb_recoverd_parent, &fd[0]);
4322         tevent_fd_set_auto_close(fde);
4323
4324         /* set up a handler to pick up sigchld */
4325         se = event_add_signal(ctdb->ev, ctdb,
4326                                      SIGCHLD, 0,
4327                                      recd_sig_child_handler,
4328                                      ctdb);
4329         if (se == NULL) {
4330                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4331                 exit(1);
4332         }
4333
4334         monitor_cluster(ctdb);
4335
4336         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4337         return -1;
4338 }
4339
4340 /*
4341   shutdown the recovery daemon
4342  */
4343 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4344 {
4345         if (ctdb->recoverd_pid == 0) {
4346                 return;
4347         }
4348
4349         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4350         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4351
4352         TALLOC_FREE(ctdb->recd_ctx);
4353         TALLOC_FREE(ctdb->recd_ping_count);
4354 }
4355
4356 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4357                        struct timeval t, void *private_data)
4358 {
4359         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4360
4361         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4362         ctdb_stop_recoverd(ctdb);
4363         ctdb_start_recoverd(ctdb);
4364 }