nmblookup: Return if the lookup was successful or not.
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_lmasters;
133         uint32_t num_connected;
134         uint32_t last_culprit_node;
135         struct ctdb_node_map *nodemap;
136         struct timeval priority_time;
137         bool need_takeover_run;
138         bool need_recovery;
139         uint32_t node_flags;
140         struct timed_event *send_election_te;
141         struct timed_event *election_timeout;
142         struct vacuum_info *vacuum_info;
143         struct srvid_requests *reallocate_requests;
144         bool takeover_run_in_progress;
145         TALLOC_CTX *takeover_runs_disable_ctx;
146         struct ctdb_control_get_ifaces *ifaces;
147         uint32_t *force_rebalance_nodes;
148 };
149
150 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
151 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
152
153 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
154
155 /*
156   ban a node for a period of time
157  */
158 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
159 {
160         int ret;
161         struct ctdb_context *ctdb = rec->ctdb;
162         struct ctdb_ban_time bantime;
163        
164         if (!ctdb_validate_pnn(ctdb, pnn)) {
165                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
166                 return;
167         }
168
169         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
170
171         bantime.pnn  = pnn;
172         bantime.time = ban_time;
173
174         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
175         if (ret != 0) {
176                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
177                 return;
178         }
179
180 }
181
182 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
183
184
185 /*
186   remember the trouble maker
187  */
188 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
191         struct ctdb_banning_state *ban_state;
192
193         if (culprit > ctdb->num_nodes) {
194                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
195                 return;
196         }
197
198         /* If we are banned or stopped, do not set other nodes as culprits */
199         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
200                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
201                 return;
202         }
203
204         if (ctdb->nodes[culprit]->ban_state == NULL) {
205                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
206                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
207
208                 
209         }
210         ban_state = ctdb->nodes[culprit]->ban_state;
211         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
212                 /* this was the first time in a long while this node
213                    misbehaved so we will forgive any old transgressions.
214                 */
215                 ban_state->count = 0;
216         }
217
218         ban_state->count += count;
219         ban_state->last_reported_time = timeval_current();
220         rec->last_culprit_node = culprit;
221 }
222
223 /*
224   remember the trouble maker
225  */
226 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
227 {
228         ctdb_set_culprit_count(rec, culprit, 1);
229 }
230
231
232 /* this callback is called for every node that failed to execute the
233    recovered event
234 */
235 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
236 {
237         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
238
239         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
240
241         ctdb_set_culprit(rec, node_pnn);
242 }
243
244 /*
245   run the "recovered" eventscript on all nodes
246  */
247 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
248 {
249         TALLOC_CTX *tmp_ctx;
250         uint32_t *nodes;
251         struct ctdb_context *ctdb = rec->ctdb;
252
253         tmp_ctx = talloc_new(ctdb);
254         CTDB_NO_MEMORY(ctdb, tmp_ctx);
255
256         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
257         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
258                                         nodes, 0,
259                                         CONTROL_TIMEOUT(), false, tdb_null,
260                                         NULL, recovered_fail_callback,
261                                         rec) != 0) {
262                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
263
264                 talloc_free(tmp_ctx);
265                 return -1;
266         }
267
268         talloc_free(tmp_ctx);
269         return 0;
270 }
271
272 /* this callback is called for every node that failed to execute the
273    start recovery event
274 */
275 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
276 {
277         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
278
279         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
280
281         ctdb_set_culprit(rec, node_pnn);
282 }
283
284 /*
285   run the "startrecovery" eventscript on all nodes
286  */
287 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
288 {
289         TALLOC_CTX *tmp_ctx;
290         uint32_t *nodes;
291         struct ctdb_context *ctdb = rec->ctdb;
292
293         tmp_ctx = talloc_new(ctdb);
294         CTDB_NO_MEMORY(ctdb, tmp_ctx);
295
296         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
298                                         nodes, 0,
299                                         CONTROL_TIMEOUT(), false, tdb_null,
300                                         NULL,
301                                         startrecovery_fail_callback,
302                                         rec) != 0) {
303                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
304                 talloc_free(tmp_ctx);
305                 return -1;
306         }
307
308         talloc_free(tmp_ctx);
309         return 0;
310 }
311
312 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
313 {
314         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
315                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
316                 return;
317         }
318         if (node_pnn < ctdb->num_nodes) {
319                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
320         }
321
322         if (node_pnn == ctdb->pnn) {
323                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
324         }
325 }
326
327 /*
328   update the node capabilities for all connected nodes
329  */
330 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
331 {
332         uint32_t *nodes;
333         TALLOC_CTX *tmp_ctx;
334
335         tmp_ctx = talloc_new(ctdb);
336         CTDB_NO_MEMORY(ctdb, tmp_ctx);
337
338         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
340                                         nodes, 0,
341                                         CONTROL_TIMEOUT(),
342                                         false, tdb_null,
343                                         async_getcap_callback, NULL,
344                                         NULL) != 0) {
345                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
346                 talloc_free(tmp_ctx);
347                 return -1;
348         }
349
350         talloc_free(tmp_ctx);
351         return 0;
352 }
353
354 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
355 {
356         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
357
358         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
359         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
360 }
361
362 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
363 {
364         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
365
366         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
367         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
368 }
369
370 /*
371   change recovery mode on all nodes
372  */
373 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
374 {
375         TDB_DATA data;
376         uint32_t *nodes;
377         TALLOC_CTX *tmp_ctx;
378
379         tmp_ctx = talloc_new(ctdb);
380         CTDB_NO_MEMORY(ctdb, tmp_ctx);
381
382         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
383
384         data.dsize = sizeof(uint32_t);
385         data.dptr = (unsigned char *)&rec_mode;
386
387         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
388                                         nodes, 0,
389                                         CONTROL_TIMEOUT(),
390                                         false, data,
391                                         NULL, NULL,
392                                         NULL) != 0) {
393                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
394                 talloc_free(tmp_ctx);
395                 return -1;
396         }
397
398         /* freeze all nodes */
399         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
400                 int i;
401
402                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
403                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
404                                                 nodes, i,
405                                                 CONTROL_TIMEOUT(),
406                                                 false, tdb_null,
407                                                 NULL,
408                                                 set_recmode_fail_callback,
409                                                 rec) != 0) {
410                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
411                                 talloc_free(tmp_ctx);
412                                 return -1;
413                         }
414                 }
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   change recovery master on all node
423  */
424 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
425 {
426         TDB_DATA data;
427         TALLOC_CTX *tmp_ctx;
428         uint32_t *nodes;
429
430         tmp_ctx = talloc_new(ctdb);
431         CTDB_NO_MEMORY(ctdb, tmp_ctx);
432
433         data.dsize = sizeof(uint32_t);
434         data.dptr = (unsigned char *)&pnn;
435
436         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
437         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
438                                         nodes, 0,
439                                         CONTROL_TIMEOUT(), false, data,
440                                         NULL, NULL,
441                                         NULL) != 0) {
442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
443                 talloc_free(tmp_ctx);
444                 return -1;
445         }
446
447         talloc_free(tmp_ctx);
448         return 0;
449 }
450
451 /* update all remote nodes to use the same db priority that we have
452    this can fail if the remove node has not yet been upgraded to 
453    support this function, so we always return success and never fail
454    a recovery if this call fails.
455 */
456 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
457         struct ctdb_node_map *nodemap, 
458         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int db;
461
462         /* step through all local databases */
463         for (db=0; db<dbmap->num;db++) {
464                 struct ctdb_db_priority db_prio;
465                 int ret;
466
467                 db_prio.db_id     = dbmap->dbs[db].dbid;
468                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
469                 if (ret != 0) {
470                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
471                         continue;
472                 }
473
474                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
475
476                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
477                                                 CTDB_CURRENT_NODE, &db_prio);
478                 if (ret != 0) {
479                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
480                                          db_prio.db_id));
481                 }
482         }
483
484         return 0;
485 }                       
486
487 /*
488   ensure all other nodes have attached to any databases that we have
489  */
490 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
491                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
492 {
493         int i, j, db, ret;
494         struct ctdb_dbid_map *remote_dbmap;
495
496         /* verify that all other nodes have all our databases */
497         for (j=0; j<nodemap->num; j++) {
498                 /* we dont need to ourself ourselves */
499                 if (nodemap->nodes[j].pnn == pnn) {
500                         continue;
501                 }
502                 /* dont check nodes that are unavailable */
503                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
504                         continue;
505                 }
506
507                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                          mem_ctx, &remote_dbmap);
509                 if (ret != 0) {
510                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
511                         return -1;
512                 }
513
514                 /* step through all local databases */
515                 for (db=0; db<dbmap->num;db++) {
516                         const char *name;
517
518
519                         for (i=0;i<remote_dbmap->num;i++) {
520                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
521                                         break;
522                                 }
523                         }
524                         /* the remote node already have this database */
525                         if (i!=remote_dbmap->num) {
526                                 continue;
527                         }
528                         /* ok so we need to create this database */
529                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
530                                                   dbmap->dbs[db].dbid, mem_ctx,
531                                                   &name);
532                         if (ret != 0) {
533                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
534                                 return -1;
535                         }
536                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
537                                                  nodemap->nodes[j].pnn,
538                                                  mem_ctx, name,
539                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
542                                 return -1;
543                         }
544                 }
545         }
546
547         return 0;
548 }
549
550
551 /*
552   ensure we are attached to any databases that anyone else is attached to
553  */
554 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
555                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
556 {
557         int i, j, db, ret;
558         struct ctdb_dbid_map *remote_dbmap;
559
560         /* verify that we have all database any other node has */
561         for (j=0; j<nodemap->num; j++) {
562                 /* we dont need to ourself ourselves */
563                 if (nodemap->nodes[j].pnn == pnn) {
564                         continue;
565                 }
566                 /* dont check nodes that are unavailable */
567                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
568                         continue;
569                 }
570
571                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
572                                          mem_ctx, &remote_dbmap);
573                 if (ret != 0) {
574                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
575                         return -1;
576                 }
577
578                 /* step through all databases on the remote node */
579                 for (db=0; db<remote_dbmap->num;db++) {
580                         const char *name;
581
582                         for (i=0;i<(*dbmap)->num;i++) {
583                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
584                                         break;
585                                 }
586                         }
587                         /* we already have this db locally */
588                         if (i!=(*dbmap)->num) {
589                                 continue;
590                         }
591                         /* ok so we need to create this database and
592                            rebuild dbmap
593                          */
594                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
595                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
596                         if (ret != 0) {
597                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
598                                           nodemap->nodes[j].pnn));
599                                 return -1;
600                         }
601                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
602                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
603                         if (ret != 0) {
604                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
605                                 return -1;
606                         }
607                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
608                         if (ret != 0) {
609                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
610                                 return -1;
611                         }
612                 }
613         }
614
615         return 0;
616 }
617
618
619 /*
620   pull the remote database contents from one node into the recdb
621  */
622 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
623                                     struct tdb_wrap *recdb, uint32_t dbid)
624 {
625         int ret;
626         TDB_DATA outdata;
627         struct ctdb_marshall_buffer *reply;
628         struct ctdb_rec_data *rec;
629         int i;
630         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
631
632         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
633                                CONTROL_TIMEOUT(), &outdata);
634         if (ret != 0) {
635                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
636                 talloc_free(tmp_ctx);
637                 return -1;
638         }
639
640         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
641
642         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
643                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
644                 talloc_free(tmp_ctx);
645                 return -1;
646         }
647         
648         rec = (struct ctdb_rec_data *)&reply->data[0];
649         
650         for (i=0;
651              i<reply->count;
652              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
653                 TDB_DATA key, data;
654                 struct ctdb_ltdb_header *hdr;
655                 TDB_DATA existing;
656                 
657                 key.dptr = &rec->data[0];
658                 key.dsize = rec->keylen;
659                 data.dptr = &rec->data[key.dsize];
660                 data.dsize = rec->datalen;
661                 
662                 hdr = (struct ctdb_ltdb_header *)data.dptr;
663
664                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
665                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
666                         talloc_free(tmp_ctx);
667                         return -1;
668                 }
669
670                 /* fetch the existing record, if any */
671                 existing = tdb_fetch(recdb->tdb, key);
672                 
673                 if (existing.dptr != NULL) {
674                         struct ctdb_ltdb_header header;
675                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
676                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
677                                          (unsigned)existing.dsize, srcnode));
678                                 free(existing.dptr);
679                                 talloc_free(tmp_ctx);
680                                 return -1;
681                         }
682                         header = *(struct ctdb_ltdb_header *)existing.dptr;
683                         free(existing.dptr);
684                         if (!(header.rsn < hdr->rsn ||
685                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
686                                 continue;
687                         }
688                 }
689                 
690                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
691                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
692                         talloc_free(tmp_ctx);
693                         return -1;                              
694                 }
695         }
696
697         talloc_free(tmp_ctx);
698
699         return 0;
700 }
701
702
703 struct pull_seqnum_cbdata {
704         int failed;
705         uint32_t pnn;
706         uint64_t seqnum;
707 };
708
709 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
710 {
711         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
712         uint64_t seqnum;
713
714         if (cb_data->failed != 0) {
715                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
716                 return;
717         }
718
719         if (res != 0) {
720                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
721                 cb_data->failed = 1;
722                 return;
723         }
724
725         if (outdata.dsize != sizeof(uint64_t)) {
726                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
727                 cb_data->failed = -1;
728                 return;
729         }
730
731         seqnum = *((uint64_t *)outdata.dptr);
732
733         if (seqnum > cb_data->seqnum ||
734             (cb_data->pnn == -1 && seqnum == 0)) {
735                 cb_data->seqnum = seqnum;
736                 cb_data->pnn = node_pnn;
737         }
738 }
739
740 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
741 {
742         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
743
744         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
745         cb_data->failed = 1;
746 }
747
748 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
749                                 struct ctdb_recoverd *rec, 
750                                 struct ctdb_node_map *nodemap, 
751                                 struct tdb_wrap *recdb, uint32_t dbid)
752 {
753         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
754         uint32_t *nodes;
755         TDB_DATA data;
756         uint32_t outdata[2];
757         struct pull_seqnum_cbdata *cb_data;
758
759         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
760
761         outdata[0] = dbid;
762         outdata[1] = 0;
763
764         data.dsize = sizeof(outdata);
765         data.dptr  = (uint8_t *)&outdata[0];
766
767         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
768         if (cb_data == NULL) {
769                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
770                 talloc_free(tmp_ctx);
771                 return -1;
772         }
773
774         cb_data->failed = 0;
775         cb_data->pnn    = -1;
776         cb_data->seqnum = 0;
777         
778         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
779         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
780                                         nodes, 0,
781                                         CONTROL_TIMEOUT(), false, data,
782                                         pull_seqnum_cb,
783                                         pull_seqnum_fail_cb,
784                                         cb_data) != 0) {
785                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
786
787                 talloc_free(tmp_ctx);
788                 return -1;
789         }
790
791         if (cb_data->failed != 0) {
792                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
793                 talloc_free(tmp_ctx);
794                 return -1;
795         }
796
797         if (cb_data->pnn == -1) {
798                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
799                 talloc_free(tmp_ctx);
800                 return -1;
801         }
802
803         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
804
805         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
806                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
807                 talloc_free(tmp_ctx);
808                 return -1;
809         }
810
811         talloc_free(tmp_ctx);
812         return 0;
813 }
814
815
816 /*
817   pull all the remote database contents into the recdb
818  */
819 static int pull_remote_database(struct ctdb_context *ctdb,
820                                 struct ctdb_recoverd *rec, 
821                                 struct ctdb_node_map *nodemap, 
822                                 struct tdb_wrap *recdb, uint32_t dbid,
823                                 bool persistent)
824 {
825         int j;
826
827         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
828                 int ret;
829                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
830                 if (ret == 0) {
831                         return 0;
832                 }
833         }
834
835         /* pull all records from all other nodes across onto this node
836            (this merges based on rsn)
837         */
838         for (j=0; j<nodemap->num; j++) {
839                 /* dont merge from nodes that are unavailable */
840                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
841                         continue;
842                 }
843                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
844                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
845                                  nodemap->nodes[j].pnn));
846                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
847                         return -1;
848                 }
849         }
850         
851         return 0;
852 }
853
854
855 /*
856   update flags on all active nodes
857  */
858 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
859 {
860         int ret;
861
862         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
863                 if (ret != 0) {
864                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
865                 return -1;
866         }
867
868         return 0;
869 }
870
871 /*
872   ensure all nodes have the same vnnmap we do
873  */
874 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
875                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
876 {
877         int j, ret;
878
879         /* push the new vnn map out to all the nodes */
880         for (j=0; j<nodemap->num; j++) {
881                 /* dont push to nodes that are unavailable */
882                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
883                         continue;
884                 }
885
886                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
887                 if (ret != 0) {
888                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
889                         return -1;
890                 }
891         }
892
893         return 0;
894 }
895
896
897 struct vacuum_info {
898         struct vacuum_info *next, *prev;
899         struct ctdb_recoverd *rec;
900         uint32_t srcnode;
901         struct ctdb_db_context *ctdb_db;
902         struct ctdb_marshall_buffer *recs;
903         struct ctdb_rec_data *r;
904 };
905
906 static void vacuum_fetch_next(struct vacuum_info *v);
907
908 /*
909   called when a vacuum fetch has completed - just free it and do the next one
910  */
911 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
912 {
913         talloc_free(state);
914 }
915
916
917 /*
918   process the next element from the vacuum list
919 */
920 static void vacuum_fetch_next(struct vacuum_info *v)
921 {
922         struct ctdb_call call;
923         struct ctdb_rec_data *r;
924
925         while (v->recs->count) {
926                 struct ctdb_client_call_state *state;
927                 TDB_DATA data;
928                 struct ctdb_ltdb_header *hdr;
929
930                 ZERO_STRUCT(call);
931                 call.call_id = CTDB_NULL_FUNC;
932                 call.flags = CTDB_IMMEDIATE_MIGRATION;
933                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
934
935                 r = v->r;
936                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
937                 v->recs->count--;
938
939                 call.key.dptr = &r->data[0];
940                 call.key.dsize = r->keylen;
941
942                 /* ensure we don't block this daemon - just skip a record if we can't get
943                    the chainlock */
944                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
945                         continue;
946                 }
947
948                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
949                 if (data.dptr == NULL) {
950                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
951                         continue;
952                 }
953
954                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
955                         free(data.dptr);
956                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
957                         continue;
958                 }
959                 
960                 hdr = (struct ctdb_ltdb_header *)data.dptr;
961                 if (hdr->dmaster == v->rec->ctdb->pnn) {
962                         /* its already local */
963                         free(data.dptr);
964                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
965                         continue;
966                 }
967
968                 free(data.dptr);
969
970                 state = ctdb_call_send(v->ctdb_db, &call);
971                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
972                 if (state == NULL) {
973                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
974                         talloc_free(v);
975                         return;
976                 }
977                 state->async.fn = vacuum_fetch_callback;
978                 state->async.private_data = NULL;
979         }
980
981         talloc_free(v);
982 }
983
984
985 /*
986   destroy a vacuum info structure
987  */
988 static int vacuum_info_destructor(struct vacuum_info *v)
989 {
990         DLIST_REMOVE(v->rec->vacuum_info, v);
991         return 0;
992 }
993
994
995 /*
996   handler for vacuum fetch
997 */
998 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
999                                  TDB_DATA data, void *private_data)
1000 {
1001         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1002         struct ctdb_marshall_buffer *recs;
1003         int ret, i;
1004         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1005         const char *name;
1006         struct ctdb_dbid_map *dbmap=NULL;
1007         bool persistent = false;
1008         struct ctdb_db_context *ctdb_db;
1009         struct ctdb_rec_data *r;
1010         uint32_t srcnode;
1011         struct vacuum_info *v;
1012
1013         recs = (struct ctdb_marshall_buffer *)data.dptr;
1014         r = (struct ctdb_rec_data *)&recs->data[0];
1015
1016         if (recs->count == 0) {
1017                 talloc_free(tmp_ctx);
1018                 return;
1019         }
1020
1021         srcnode = r->reqid;
1022
1023         for (v=rec->vacuum_info;v;v=v->next) {
1024                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1025                         /* we're already working on records from this node */
1026                         talloc_free(tmp_ctx);
1027                         return;
1028                 }
1029         }
1030
1031         /* work out if the database is persistent */
1032         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1033         if (ret != 0) {
1034                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1035                 talloc_free(tmp_ctx);
1036                 return;
1037         }
1038
1039         for (i=0;i<dbmap->num;i++) {
1040                 if (dbmap->dbs[i].dbid == recs->db_id) {
1041                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1042                         break;
1043                 }
1044         }
1045         if (i == dbmap->num) {
1046                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1047                 talloc_free(tmp_ctx);
1048                 return;         
1049         }
1050
1051         /* find the name of this database */
1052         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1053                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1054                 talloc_free(tmp_ctx);
1055                 return;
1056         }
1057
1058         /* attach to it */
1059         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1060         if (ctdb_db == NULL) {
1061                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1062                 talloc_free(tmp_ctx);
1063                 return;
1064         }
1065
1066         v = talloc_zero(rec, struct vacuum_info);
1067         if (v == NULL) {
1068                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1069                 talloc_free(tmp_ctx);
1070                 return;
1071         }
1072
1073         v->rec = rec;
1074         v->srcnode = srcnode;
1075         v->ctdb_db = ctdb_db;
1076         v->recs = talloc_memdup(v, recs, data.dsize);
1077         if (v->recs == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1079                 talloc_free(v);
1080                 talloc_free(tmp_ctx);
1081                 return;         
1082         }
1083         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1084
1085         DLIST_ADD(rec->vacuum_info, v);
1086
1087         talloc_set_destructor(v, vacuum_info_destructor);
1088
1089         vacuum_fetch_next(v);
1090         talloc_free(tmp_ctx);
1091 }
1092
1093
1094 /*
1095  * handler for database detach
1096  */
1097 static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
1098                                     TDB_DATA data, void *private_data)
1099 {
1100         struct ctdb_recoverd *rec = talloc_get_type(private_data,
1101                                                     struct ctdb_recoverd);
1102         uint32_t db_id;
1103         struct vacuum_info *v, *vnext;
1104         struct ctdb_db_context *ctdb_db;
1105
1106         if (data.dsize != sizeof(db_id)) {
1107                 return;
1108         }
1109         db_id = *(uint32_t *)data.dptr;
1110
1111         ctdb_db = find_ctdb_db(ctdb, db_id);
1112         if (ctdb_db == NULL) {
1113                 /* database is not attached */
1114                 return;
1115         }
1116
1117         /* Stop any active vacuum fetch */
1118         v = rec->vacuum_info;
1119         while (v != NULL) {
1120                 vnext = v->next;
1121
1122                 if (v->ctdb_db->db_id == db_id) {
1123                         talloc_free(v);
1124                 }
1125                 v = vnext;
1126         }
1127
1128         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1129
1130         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1131                              ctdb_db->db_name));
1132         talloc_free(ctdb_db);
1133 }
1134
1135 /*
1136   called when ctdb_wait_timeout should finish
1137  */
1138 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1139                               struct timeval yt, void *p)
1140 {
1141         uint32_t *timed_out = (uint32_t *)p;
1142         (*timed_out) = 1;
1143 }
1144
1145 /*
1146   wait for a given number of seconds
1147  */
1148 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1149 {
1150         uint32_t timed_out = 0;
1151         time_t usecs = (secs - (time_t)secs) * 1000000;
1152         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1153         while (!timed_out) {
1154                 event_loop_once(ctdb->ev);
1155         }
1156 }
1157
1158 /*
1159   called when an election times out (ends)
1160  */
1161 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1162                                   struct timeval t, void *p)
1163 {
1164         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1165         rec->election_timeout = NULL;
1166         fast_start = false;
1167
1168         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1169 }
1170
1171
1172 /*
1173   wait for an election to finish. It finished election_timeout seconds after
1174   the last election packet is received
1175  */
1176 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1177 {
1178         struct ctdb_context *ctdb = rec->ctdb;
1179         while (rec->election_timeout) {
1180                 event_loop_once(ctdb->ev);
1181         }
1182 }
1183
1184 /*
1185   Update our local flags from all remote connected nodes. 
1186   This is only run when we are or we belive we are the recovery master
1187  */
1188 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1189 {
1190         int j;
1191         struct ctdb_context *ctdb = rec->ctdb;
1192         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1193
1194         /* get the nodemap for all active remote nodes and verify
1195            they are the same as for this node
1196          */
1197         for (j=0; j<nodemap->num; j++) {
1198                 struct ctdb_node_map *remote_nodemap=NULL;
1199                 int ret;
1200
1201                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1202                         continue;
1203                 }
1204                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1205                         continue;
1206                 }
1207
1208                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1209                                            mem_ctx, &remote_nodemap);
1210                 if (ret != 0) {
1211                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1212                                   nodemap->nodes[j].pnn));
1213                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1214                         talloc_free(mem_ctx);
1215                         return MONITOR_FAILED;
1216                 }
1217                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1218                         /* We should tell our daemon about this so it
1219                            updates its flags or else we will log the same 
1220                            message again in the next iteration of recovery.
1221                            Since we are the recovery master we can just as
1222                            well update the flags on all nodes.
1223                         */
1224                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1225                         if (ret != 0) {
1226                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1227                                 return -1;
1228                         }
1229
1230                         /* Update our local copy of the flags in the recovery
1231                            daemon.
1232                         */
1233                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1234                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1235                                  nodemap->nodes[j].flags));
1236                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1237                 }
1238                 talloc_free(remote_nodemap);
1239         }
1240         talloc_free(mem_ctx);
1241         return MONITOR_OK;
1242 }
1243
1244
1245 /* Create a new random generation ip. 
1246    The generation id can not be the INVALID_GENERATION id
1247 */
1248 static uint32_t new_generation(void)
1249 {
1250         uint32_t generation;
1251
1252         while (1) {
1253                 generation = random();
1254
1255                 if (generation != INVALID_GENERATION) {
1256                         break;
1257                 }
1258         }
1259
1260         return generation;
1261 }
1262
1263
1264 /*
1265   create a temporary working database
1266  */
1267 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1268 {
1269         char *name;
1270         struct tdb_wrap *recdb;
1271         unsigned tdb_flags;
1272
1273         /* open up the temporary recovery database */
1274         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1275                                ctdb->db_directory_state,
1276                                ctdb->pnn);
1277         if (name == NULL) {
1278                 return NULL;
1279         }
1280         unlink(name);
1281
1282         tdb_flags = TDB_NOLOCK;
1283         if (ctdb->valgrinding) {
1284                 tdb_flags |= TDB_NOMMAP;
1285         }
1286         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1287
1288         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1289                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1290         if (recdb == NULL) {
1291                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1292         }
1293
1294         talloc_free(name);
1295
1296         return recdb;
1297 }
1298
1299
1300 /* 
1301    a traverse function for pulling all relevant records from recdb
1302  */
1303 struct recdb_data {
1304         struct ctdb_context *ctdb;
1305         struct ctdb_marshall_buffer *recdata;
1306         uint32_t len;
1307         uint32_t allocated_len;
1308         bool failed;
1309         bool persistent;
1310 };
1311
1312 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1313 {
1314         struct recdb_data *params = (struct recdb_data *)p;
1315         struct ctdb_rec_data *rec;
1316         struct ctdb_ltdb_header *hdr;
1317
1318         /*
1319          * skip empty records - but NOT for persistent databases:
1320          *
1321          * The record-by-record mode of recovery deletes empty records.
1322          * For persistent databases, this can lead to data corruption
1323          * by deleting records that should be there:
1324          *
1325          * - Assume the cluster has been running for a while.
1326          *
1327          * - A record R in a persistent database has been created and
1328          *   deleted a couple of times, the last operation being deletion,
1329          *   leaving an empty record with a high RSN, say 10.
1330          *
1331          * - Now a node N is turned off.
1332          *
1333          * - This leaves the local database copy of D on N with the empty
1334          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1335          *   the copy of record R.
1336          *
1337          * - Now the record is created again while node N is turned off.
1338          *   This creates R with RSN = 1 on all nodes except for N.
1339          *
1340          * - Now node N is turned on again. The following recovery will chose
1341          *   the older empty copy of R due to RSN 10 > RSN 1.
1342          *
1343          * ==> Hence the record is gone after the recovery.
1344          *
1345          * On databases like Samba's registry, this can damage the higher-level
1346          * data structures built from the various tdb-level records.
1347          */
1348         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1349                 return 0;
1350         }
1351
1352         /* update the dmaster field to point to us */
1353         hdr = (struct ctdb_ltdb_header *)data.dptr;
1354         if (!params->persistent) {
1355                 hdr->dmaster = params->ctdb->pnn;
1356                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1357         }
1358
1359         /* add the record to the blob ready to send to the nodes */
1360         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1361         if (rec == NULL) {
1362                 params->failed = true;
1363                 return -1;
1364         }
1365         if (params->len + rec->length >= params->allocated_len) {
1366                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1367                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1368         }
1369         if (params->recdata == NULL) {
1370                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1371                          rec->length + params->len));
1372                 params->failed = true;
1373                 return -1;
1374         }
1375         params->recdata->count++;
1376         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1377         params->len += rec->length;
1378         talloc_free(rec);
1379
1380         return 0;
1381 }
1382
1383 /*
1384   push the recdb database out to all nodes
1385  */
1386 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1387                                bool persistent,
1388                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1389 {
1390         struct recdb_data params;
1391         struct ctdb_marshall_buffer *recdata;
1392         TDB_DATA outdata;
1393         TALLOC_CTX *tmp_ctx;
1394         uint32_t *nodes;
1395
1396         tmp_ctx = talloc_new(ctdb);
1397         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1398
1399         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1400         CTDB_NO_MEMORY(ctdb, recdata);
1401
1402         recdata->db_id = dbid;
1403
1404         params.ctdb = ctdb;
1405         params.recdata = recdata;
1406         params.len = offsetof(struct ctdb_marshall_buffer, data);
1407         params.allocated_len = params.len;
1408         params.failed = false;
1409         params.persistent = persistent;
1410
1411         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1412                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1413                 talloc_free(params.recdata);
1414                 talloc_free(tmp_ctx);
1415                 return -1;
1416         }
1417
1418         if (params.failed) {
1419                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1420                 talloc_free(params.recdata);
1421                 talloc_free(tmp_ctx);
1422                 return -1;              
1423         }
1424
1425         recdata = params.recdata;
1426
1427         outdata.dptr = (void *)recdata;
1428         outdata.dsize = params.len;
1429
1430         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1431         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1432                                         nodes, 0,
1433                                         CONTROL_TIMEOUT(), false, outdata,
1434                                         NULL, NULL,
1435                                         NULL) != 0) {
1436                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1437                 talloc_free(recdata);
1438                 talloc_free(tmp_ctx);
1439                 return -1;
1440         }
1441
1442         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1443                   dbid, recdata->count));
1444
1445         talloc_free(recdata);
1446         talloc_free(tmp_ctx);
1447
1448         return 0;
1449 }
1450
1451
1452 /*
1453   go through a full recovery on one database 
1454  */
1455 static int recover_database(struct ctdb_recoverd *rec, 
1456                             TALLOC_CTX *mem_ctx,
1457                             uint32_t dbid,
1458                             bool persistent,
1459                             uint32_t pnn, 
1460                             struct ctdb_node_map *nodemap,
1461                             uint32_t transaction_id)
1462 {
1463         struct tdb_wrap *recdb;
1464         int ret;
1465         struct ctdb_context *ctdb = rec->ctdb;
1466         TDB_DATA data;
1467         struct ctdb_control_wipe_database w;
1468         uint32_t *nodes;
1469
1470         recdb = create_recdb(ctdb, mem_ctx);
1471         if (recdb == NULL) {
1472                 return -1;
1473         }
1474
1475         /* pull all remote databases onto the recdb */
1476         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1477         if (ret != 0) {
1478                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1479                 return -1;
1480         }
1481
1482         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1483
1484         /* wipe all the remote databases. This is safe as we are in a transaction */
1485         w.db_id = dbid;
1486         w.transaction_id = transaction_id;
1487
1488         data.dptr = (void *)&w;
1489         data.dsize = sizeof(w);
1490
1491         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1492         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1493                                         nodes, 0,
1494                                         CONTROL_TIMEOUT(), false, data,
1495                                         NULL, NULL,
1496                                         NULL) != 0) {
1497                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1498                 talloc_free(recdb);
1499                 return -1;
1500         }
1501         
1502         /* push out the correct database. This sets the dmaster and skips 
1503            the empty records */
1504         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1505         if (ret != 0) {
1506                 talloc_free(recdb);
1507                 return -1;
1508         }
1509
1510         /* all done with this database */
1511         talloc_free(recdb);
1512
1513         return 0;
1514 }
1515
1516 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1517                                          struct ctdb_recoverd *rec,
1518                                          struct ctdb_node_map *nodemap,
1519                                          uint32_t *culprit)
1520 {
1521         int j;
1522         int ret;
1523
1524         if (ctdb->num_nodes != nodemap->num) {
1525                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1526                                   ctdb->num_nodes, nodemap->num));
1527                 if (culprit) {
1528                         *culprit = ctdb->pnn;
1529                 }
1530                 return -1;
1531         }
1532
1533         for (j=0; j<nodemap->num; j++) {
1534                 /* For readability */
1535                 struct ctdb_node *node = ctdb->nodes[j];
1536
1537                 /* release any existing data */
1538                 if (node->known_public_ips) {
1539                         talloc_free(node->known_public_ips);
1540                         node->known_public_ips = NULL;
1541                 }
1542                 if (node->available_public_ips) {
1543                         talloc_free(node->available_public_ips);
1544                         node->available_public_ips = NULL;
1545                 }
1546
1547                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1548                         continue;
1549                 }
1550
1551                 /* Retrieve the list of known public IPs from the node */
1552                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1553                                         CONTROL_TIMEOUT(),
1554                                         node->pnn,
1555                                         ctdb->nodes,
1556                                         0,
1557                                         &node->known_public_ips);
1558                 if (ret != 0) {
1559                         DEBUG(DEBUG_ERR,
1560                               ("Failed to read known public IPs from node: %u\n",
1561                                node->pnn));
1562                         if (culprit) {
1563                                 *culprit = node->pnn;
1564                         }
1565                         return -1;
1566                 }
1567
1568                 if (ctdb->do_checkpublicip &&
1569                     rec->takeover_runs_disable_ctx == NULL &&
1570                     verify_remote_ip_allocation(ctdb,
1571                                                  node->known_public_ips,
1572                                                  node->pnn)) {
1573                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1574                         rec->need_takeover_run = true;
1575                 }
1576
1577                 /* Retrieve the list of available public IPs from the node */
1578                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1579                                         CONTROL_TIMEOUT(),
1580                                         node->pnn,
1581                                         ctdb->nodes,
1582                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1583                                         &node->available_public_ips);
1584                 if (ret != 0) {
1585                         DEBUG(DEBUG_ERR,
1586                               ("Failed to read available public IPs from node: %u\n",
1587                                node->pnn));
1588                         if (culprit) {
1589                                 *culprit = node->pnn;
1590                         }
1591                         return -1;
1592                 }
1593         }
1594
1595         return 0;
1596 }
1597
1598 /* when we start a recovery, make sure all nodes use the same reclock file
1599    setting
1600 */
1601 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1602 {
1603         struct ctdb_context *ctdb = rec->ctdb;
1604         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1605         TDB_DATA data;
1606         uint32_t *nodes;
1607
1608         if (ctdb->recovery_lock_file == NULL) {
1609                 data.dptr  = NULL;
1610                 data.dsize = 0;
1611         } else {
1612                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1613                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1614         }
1615
1616         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1617         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1618                                         nodes, 0,
1619                                         CONTROL_TIMEOUT(),
1620                                         false, data,
1621                                         NULL, NULL,
1622                                         rec) != 0) {
1623                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1624                 talloc_free(tmp_ctx);
1625                 return -1;
1626         }
1627
1628         talloc_free(tmp_ctx);
1629         return 0;
1630 }
1631
1632
1633 /*
1634  * this callback is called for every node that failed to execute ctdb_takeover_run()
1635  * and set flag to re-run takeover run.
1636  */
1637 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1638 {
1639         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1640
1641         if (callback_data != NULL) {
1642                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1643
1644                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1645
1646                 ctdb_set_culprit(rec, node_pnn);
1647         }
1648 }
1649
1650
1651 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1652 {
1653         struct ctdb_context *ctdb = rec->ctdb;
1654         int i;
1655         struct ctdb_banning_state *ban_state;
1656
1657         *self_ban = false;
1658         for (i=0; i<ctdb->num_nodes; i++) {
1659                 if (ctdb->nodes[i]->ban_state == NULL) {
1660                         continue;
1661                 }
1662                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1663                 if (ban_state->count < 2*ctdb->num_nodes) {
1664                         continue;
1665                 }
1666
1667                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1668                         ctdb->nodes[i]->pnn, ban_state->count,
1669                         ctdb->tunable.recovery_ban_period));
1670                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1671                 ban_state->count = 0;
1672
1673                 /* Banning ourself? */
1674                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1675                         *self_ban = true;
1676                 }
1677         }
1678 }
1679
1680 static bool do_takeover_run(struct ctdb_recoverd *rec,
1681                             struct ctdb_node_map *nodemap,
1682                             bool banning_credits_on_fail)
1683 {
1684         uint32_t *nodes = NULL;
1685         struct srvid_request_data dtr;
1686         TDB_DATA data;
1687         int i;
1688         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1689         int ret;
1690         bool ok;
1691
1692         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1693
1694         if (rec->takeover_run_in_progress) {
1695                 DEBUG(DEBUG_ERR, (__location__
1696                                   " takeover run already in progress \n"));
1697                 ok = false;
1698                 goto done;
1699         }
1700
1701         rec->takeover_run_in_progress = true;
1702
1703         /* If takeover runs are in disabled then fail... */
1704         if (rec->takeover_runs_disable_ctx != NULL) {
1705                 DEBUG(DEBUG_ERR,
1706                       ("Takeover runs are disabled so refusing to run one\n"));
1707                 ok = false;
1708                 goto done;
1709         }
1710
1711         /* Disable IP checks (takeover runs, really) on other nodes
1712          * while doing this takeover run.  This will stop those other
1713          * nodes from triggering takeover runs when think they should
1714          * be hosting an IP but it isn't yet on an interface.  Don't
1715          * wait for replies since a failure here might cause some
1716          * noise in the logs but will not actually cause a problem.
1717          */
1718         dtr.srvid = 0; /* No reply */
1719         dtr.pnn = -1;
1720
1721         data.dptr  = (uint8_t*)&dtr;
1722         data.dsize = sizeof(dtr);
1723
1724         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1725
1726         /* Disable for 60 seconds.  This can be a tunable later if
1727          * necessary.
1728          */
1729         dtr.data = 60;
1730         for (i = 0; i < talloc_array_length(nodes); i++) {
1731                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1732                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1733                                              data) != 0) {
1734                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1735                 }
1736         }
1737
1738         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1739                                 rec->force_rebalance_nodes,
1740                                 takeover_fail_callback,
1741                                 banning_credits_on_fail ? rec : NULL);
1742
1743         /* Reenable takeover runs and IP checks on other nodes */
1744         dtr.data = 0;
1745         for (i = 0; i < talloc_array_length(nodes); i++) {
1746                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1747                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1748                                              data) != 0) {
1749                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1750                 }
1751         }
1752
1753         if (ret != 0) {
1754                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1755                 ok = false;
1756                 goto done;
1757         }
1758
1759         ok = true;
1760         /* Takeover run was successful so clear force rebalance targets */
1761         if (rebalance_nodes == rec->force_rebalance_nodes) {
1762                 TALLOC_FREE(rec->force_rebalance_nodes);
1763         } else {
1764                 DEBUG(DEBUG_WARNING,
1765                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1766         }
1767 done:
1768         rec->need_takeover_run = !ok;
1769         talloc_free(nodes);
1770         rec->takeover_run_in_progress = false;
1771
1772         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1773         return ok;
1774 }
1775
1776
1777 /*
1778   we are the recmaster, and recovery is needed - start a recovery run
1779  */
1780 static int do_recovery(struct ctdb_recoverd *rec, 
1781                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1782                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1783 {
1784         struct ctdb_context *ctdb = rec->ctdb;
1785         int i, j, ret;
1786         uint32_t generation;
1787         struct ctdb_dbid_map *dbmap;
1788         TDB_DATA data;
1789         uint32_t *nodes;
1790         struct timeval start_time;
1791         uint32_t culprit = (uint32_t)-1;
1792         bool self_ban;
1793
1794         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1795
1796         /* if recovery fails, force it again */
1797         rec->need_recovery = true;
1798
1799         if (rec->election_timeout) {
1800                 /* an election is in progress */
1801                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1802                 return -1;
1803         }
1804
1805         ban_misbehaving_nodes(rec, &self_ban);
1806         if (self_ban) {
1807                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1808                 return -1;
1809         }
1810
1811         if (ctdb->tunable.verify_recovery_lock != 0) {
1812                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1813                 start_time = timeval_current();
1814                 if (!ctdb_recovery_lock(ctdb, true)) {
1815                         if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1816                                 /* If ctdb is trying first recovery, it's
1817                                  * possible that current node does not know yet
1818                                  * who the recmaster is.
1819                                  */
1820                                 DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
1821                                                 " - retrying recovery\n"));
1822                                 return -1;
1823                         }
1824
1825                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1826                                          "and ban ourself for %u seconds\n",
1827                                          ctdb->tunable.recovery_ban_period));
1828                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1829                         return -1;
1830                 }
1831                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1832                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1833         }
1834
1835         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1836
1837         /* get a list of all databases */
1838         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1841                 return -1;
1842         }
1843
1844         /* we do the db creation before we set the recovery mode, so the freeze happens
1845            on all databases we will be dealing with. */
1846
1847         /* verify that we have all the databases any other node has */
1848         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1849         if (ret != 0) {
1850                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1851                 return -1;
1852         }
1853
1854         /* verify that all other nodes have all our databases */
1855         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1856         if (ret != 0) {
1857                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1858                 return -1;
1859         }
1860         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1861
1862         /* update the database priority for all remote databases */
1863         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1864         if (ret != 0) {
1865                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1866         }
1867         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1868
1869
1870         /* update all other nodes to use the same setting for reclock files
1871            as the local recovery master.
1872         */
1873         sync_recovery_lock_file_across_cluster(rec);
1874
1875         /* set recovery mode to active on all nodes */
1876         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1877         if (ret != 0) {
1878                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1879                 return -1;
1880         }
1881
1882         /* execute the "startrecovery" event script on all nodes */
1883         ret = run_startrecovery_eventscript(rec, nodemap);
1884         if (ret!=0) {
1885                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1886                 return -1;
1887         }
1888
1889         /*
1890           update all nodes to have the same flags that we have
1891          */
1892         for (i=0;i<nodemap->num;i++) {
1893                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1894                         continue;
1895                 }
1896
1897                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1898                 if (ret != 0) {
1899                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1900                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1901                         } else {
1902                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1903                                 return -1;
1904                         }
1905                 }
1906         }
1907
1908         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1909
1910         /* pick a new generation number */
1911         generation = new_generation();
1912
1913         /* change the vnnmap on this node to use the new generation 
1914            number but not on any other nodes.
1915            this guarantees that if we abort the recovery prematurely
1916            for some reason (a node stops responding?)
1917            that we can just return immediately and we will reenter
1918            recovery shortly again.
1919            I.e. we deliberately leave the cluster with an inconsistent
1920            generation id to allow us to abort recovery at any stage and
1921            just restart it from scratch.
1922          */
1923         vnnmap->generation = generation;
1924         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1925         if (ret != 0) {
1926                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1927                 return -1;
1928         }
1929
1930         data.dptr = (void *)&generation;
1931         data.dsize = sizeof(uint32_t);
1932
1933         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1934         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1935                                         nodes, 0,
1936                                         CONTROL_TIMEOUT(), false, data,
1937                                         NULL,
1938                                         transaction_start_fail_callback,
1939                                         rec) != 0) {
1940                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1941                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1942                                         nodes, 0,
1943                                         CONTROL_TIMEOUT(), false, tdb_null,
1944                                         NULL,
1945                                         NULL,
1946                                         NULL) != 0) {
1947                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1948                 }
1949                 return -1;
1950         }
1951
1952         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1953
1954         for (i=0;i<dbmap->num;i++) {
1955                 ret = recover_database(rec, mem_ctx,
1956                                        dbmap->dbs[i].dbid,
1957                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1958                                        pnn, nodemap, generation);
1959                 if (ret != 0) {
1960                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1961                         return -1;
1962                 }
1963         }
1964
1965         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1966
1967         /* commit all the changes */
1968         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1969                                         nodes, 0,
1970                                         CONTROL_TIMEOUT(), false, data,
1971                                         NULL, NULL,
1972                                         NULL) != 0) {
1973                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1974                 return -1;
1975         }
1976
1977         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1978         
1979
1980         /* update the capabilities for all nodes */
1981         ret = update_capabilities(ctdb, nodemap);
1982         if (ret!=0) {
1983                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1984                 return -1;
1985         }
1986
1987         /* build a new vnn map with all the currently active and
1988            unbanned nodes */
1989         generation = new_generation();
1990         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1991         CTDB_NO_MEMORY(ctdb, vnnmap);
1992         vnnmap->generation = generation;
1993         vnnmap->size = 0;
1994         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1995         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1996         for (i=j=0;i<nodemap->num;i++) {
1997                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1998                         continue;
1999                 }
2000                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
2001                         /* this node can not be an lmaster */
2002                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
2003                         continue;
2004                 }
2005
2006                 vnnmap->size++;
2007                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2008                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2009                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
2010
2011         }
2012         if (vnnmap->size == 0) {
2013                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2014                 vnnmap->size++;
2015                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2016                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2017                 vnnmap->map[0] = pnn;
2018         }       
2019
2020         /* update to the new vnnmap on all nodes */
2021         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2022         if (ret != 0) {
2023                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2024                 return -1;
2025         }
2026
2027         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2028
2029         /* update recmaster to point to us for all nodes */
2030         ret = set_recovery_master(ctdb, nodemap, pnn);
2031         if (ret!=0) {
2032                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2033                 return -1;
2034         }
2035
2036         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2037
2038         /* disable recovery mode */
2039         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
2040         if (ret != 0) {
2041                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2042                 return -1;
2043         }
2044
2045         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2046
2047         /* Fetch known/available public IPs from each active node */
2048         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2049         if (ret != 0) {
2050                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2051                                  culprit));
2052                 rec->need_takeover_run = true;
2053                 return -1;
2054         }
2055
2056         do_takeover_run(rec, nodemap, false);
2057
2058         /* execute the "recovered" event script on all nodes */
2059         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2060         if (ret!=0) {
2061                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2062                 return -1;
2063         }
2064
2065         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2066
2067         /* send a message to all clients telling them that the cluster 
2068            has been reconfigured */
2069         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2070                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2071         if (ret != 0) {
2072                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2073                 return -1;
2074         }
2075
2076         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2077
2078         rec->need_recovery = false;
2079
2080         /* we managed to complete a full recovery, make sure to forgive
2081            any past sins by the nodes that could now participate in the
2082            recovery.
2083         */
2084         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2085         for (i=0;i<nodemap->num;i++) {
2086                 struct ctdb_banning_state *ban_state;
2087
2088                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2089                         continue;
2090                 }
2091
2092                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2093                 if (ban_state == NULL) {
2094                         continue;
2095                 }
2096
2097                 ban_state->count = 0;
2098         }
2099
2100
2101         /* We just finished a recovery successfully. 
2102            We now wait for rerecovery_timeout before we allow 
2103            another recovery to take place.
2104         */
2105         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2106         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2107         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2108
2109         return 0;
2110 }
2111
2112
2113 /*
2114   elections are won by first checking the number of connected nodes, then
2115   the priority time, then the pnn
2116  */
2117 struct election_message {
2118         uint32_t num_connected;
2119         struct timeval priority_time;
2120         uint32_t pnn;
2121         uint32_t node_flags;
2122 };
2123
2124 /*
2125   form this nodes election data
2126  */
2127 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2128 {
2129         int ret, i;
2130         struct ctdb_node_map *nodemap;
2131         struct ctdb_context *ctdb = rec->ctdb;
2132
2133         ZERO_STRUCTP(em);
2134
2135         em->pnn = rec->ctdb->pnn;
2136         em->priority_time = rec->priority_time;
2137
2138         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2139         if (ret != 0) {
2140                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2141                 return;
2142         }
2143
2144         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2145         em->node_flags = rec->node_flags;
2146
2147         for (i=0;i<nodemap->num;i++) {
2148                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2149                         em->num_connected++;
2150                 }
2151         }
2152
2153         /* we shouldnt try to win this election if we cant be a recmaster */
2154         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2155                 em->num_connected = 0;
2156                 em->priority_time = timeval_current();
2157         }
2158
2159         talloc_free(nodemap);
2160 }
2161
2162 /*
2163   see if the given election data wins
2164  */
2165 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2166 {
2167         struct election_message myem;
2168         int cmp = 0;
2169
2170         ctdb_election_data(rec, &myem);
2171
2172         /* we cant win if we dont have the recmaster capability */
2173         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2174                 return false;
2175         }
2176
2177         /* we cant win if we are banned */
2178         if (rec->node_flags & NODE_FLAGS_BANNED) {
2179                 return false;
2180         }
2181
2182         /* we cant win if we are stopped */
2183         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2184                 return false;
2185         }
2186
2187         /* we will automatically win if the other node is banned */
2188         if (em->node_flags & NODE_FLAGS_BANNED) {
2189                 return true;
2190         }
2191
2192         /* we will automatically win if the other node is banned */
2193         if (em->node_flags & NODE_FLAGS_STOPPED) {
2194                 return true;
2195         }
2196
2197         /* try to use the most connected node */
2198         if (cmp == 0) {
2199                 cmp = (int)myem.num_connected - (int)em->num_connected;
2200         }
2201
2202         /* then the longest running node */
2203         if (cmp == 0) {
2204                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2205         }
2206
2207         if (cmp == 0) {
2208                 cmp = (int)myem.pnn - (int)em->pnn;
2209         }
2210
2211         return cmp > 0;
2212 }
2213
2214 /*
2215   send out an election request
2216  */
2217 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2218 {
2219         int ret;
2220         TDB_DATA election_data;
2221         struct election_message emsg;
2222         uint64_t srvid;
2223         struct ctdb_context *ctdb = rec->ctdb;
2224
2225         srvid = CTDB_SRVID_RECOVERY;
2226
2227         ctdb_election_data(rec, &emsg);
2228
2229         election_data.dsize = sizeof(struct election_message);
2230         election_data.dptr  = (unsigned char *)&emsg;
2231
2232
2233         /* first we assume we will win the election and set 
2234            recoverymaster to be ourself on the current node
2235          */
2236         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2237         if (ret != 0) {
2238                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2239                 return -1;
2240         }
2241
2242
2243         /* send an election message to all active nodes */
2244         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2245         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2246 }
2247
2248 /*
2249   this function will unban all nodes in the cluster
2250 */
2251 static void unban_all_nodes(struct ctdb_context *ctdb)
2252 {
2253         int ret, i;
2254         struct ctdb_node_map *nodemap;
2255         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2256         
2257         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2258         if (ret != 0) {
2259                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2260                 return;
2261         }
2262
2263         for (i=0;i<nodemap->num;i++) {
2264                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2265                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2266                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2267                                                  nodemap->nodes[i].pnn, 0,
2268                                                  NODE_FLAGS_BANNED);
2269                         if (ret != 0) {
2270                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2271                         }
2272                 }
2273         }
2274
2275         talloc_free(tmp_ctx);
2276 }
2277
2278
2279 /*
2280   we think we are winning the election - send a broadcast election request
2281  */
2282 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2283 {
2284         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2285         int ret;
2286
2287         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2288         if (ret != 0) {
2289                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2290         }
2291
2292         talloc_free(rec->send_election_te);
2293         rec->send_election_te = NULL;
2294 }
2295
2296 /*
2297   handler for memory dumps
2298 */
2299 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2300                              TDB_DATA data, void *private_data)
2301 {
2302         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2303         TDB_DATA *dump;
2304         int ret;
2305         struct srvid_request *rd;
2306
2307         if (data.dsize != sizeof(struct srvid_request)) {
2308                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2309                 talloc_free(tmp_ctx);
2310                 return;
2311         }
2312         rd = (struct srvid_request *)data.dptr;
2313
2314         dump = talloc_zero(tmp_ctx, TDB_DATA);
2315         if (dump == NULL) {
2316                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2317                 talloc_free(tmp_ctx);
2318                 return;
2319         }
2320         ret = ctdb_dump_memory(ctdb, dump);
2321         if (ret != 0) {
2322                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2323                 talloc_free(tmp_ctx);
2324                 return;
2325         }
2326
2327 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2328
2329         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2330         if (ret != 0) {
2331                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2332                 talloc_free(tmp_ctx);
2333                 return;
2334         }
2335
2336         talloc_free(tmp_ctx);
2337 }
2338
2339 /*
2340   handler for reload_nodes
2341 */
2342 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2343                              TDB_DATA data, void *private_data)
2344 {
2345         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2346
2347         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2348
2349         ctdb_load_nodes_file(rec->ctdb);
2350 }
2351
2352
2353 static void ctdb_rebalance_timeout(struct event_context *ev,
2354                                    struct timed_event *te,
2355                                    struct timeval t, void *p)
2356 {
2357         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2358
2359         if (rec->force_rebalance_nodes == NULL) {
2360                 DEBUG(DEBUG_ERR,
2361                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2362                 return;
2363         }
2364
2365         DEBUG(DEBUG_NOTICE,
2366               ("Rebalance timeout occurred - do takeover run\n"));
2367         do_takeover_run(rec, rec->nodemap, false);
2368 }
2369
2370         
2371 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2372                                         uint64_t srvid,
2373                                         TDB_DATA data, void *private_data)
2374 {
2375         uint32_t pnn;
2376         uint32_t *t;
2377         int len;
2378         uint32_t deferred_rebalance;
2379         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2380
2381         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2382                 return;
2383         }
2384
2385         if (data.dsize != sizeof(uint32_t)) {
2386                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2387                 return;
2388         }
2389
2390         pnn = *(uint32_t *)&data.dptr[0];
2391
2392         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2393
2394         /* Copy any existing list of nodes.  There's probably some
2395          * sort of realloc variant that will do this but we need to
2396          * make sure that freeing the old array also cancels the timer
2397          * event for the timeout... not sure if realloc will do that.
2398          */
2399         len = (rec->force_rebalance_nodes != NULL) ?
2400                 talloc_array_length(rec->force_rebalance_nodes) :
2401                 0;
2402
2403         /* This allows duplicates to be added but they don't cause
2404          * harm.  A call to add a duplicate PNN arguably means that
2405          * the timeout should be reset, so this is the simplest
2406          * solution.
2407          */
2408         t = talloc_zero_array(rec, uint32_t, len+1);
2409         CTDB_NO_MEMORY_VOID(ctdb, t);
2410         if (len > 0) {
2411                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2412         }
2413         t[len] = pnn;
2414
2415         talloc_free(rec->force_rebalance_nodes);
2416
2417         rec->force_rebalance_nodes = t;
2418
2419         /* If configured, setup a deferred takeover run to make sure
2420          * that certain nodes get IPs rebalanced to them.  This will
2421          * be cancelled if a successful takeover run happens before
2422          * the timeout.  Assign tunable value to variable for
2423          * readability.
2424          */
2425         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2426         if (deferred_rebalance != 0) {
2427                 event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2428                                 timeval_current_ofs(deferred_rebalance, 0),
2429                                 ctdb_rebalance_timeout, rec);
2430         }
2431 }
2432
2433
2434
2435 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2436                              TDB_DATA data, void *private_data)
2437 {
2438         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2439         struct ctdb_public_ip *ip;
2440
2441         if (rec->recmaster != rec->ctdb->pnn) {
2442                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2443                 return;
2444         }
2445
2446         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2447                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2448                 return;
2449         }
2450
2451         ip = (struct ctdb_public_ip *)data.dptr;
2452
2453         update_ip_assignment_tree(rec->ctdb, ip);
2454 }
2455
2456
2457 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2458 {
2459         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2460 }
2461
2462 static void reenable_takeover_runs(struct event_context *ev,
2463                                    struct timed_event *te,
2464                                    struct timeval yt, void *p)
2465 {
2466         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2467
2468         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2469         clear_takeover_runs_disable(rec);
2470 }
2471
2472 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2473                                           uint64_t srvid, TDB_DATA data,
2474                                           void *private_data)
2475 {
2476         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2477                                                     struct ctdb_recoverd);
2478         struct srvid_request_data *r;
2479         uint32_t timeout;
2480         TDB_DATA result;
2481         int32_t ret = 0;
2482
2483         /* Validate input data */
2484         if (data.dsize != sizeof(struct srvid_request_data)) {
2485                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2486                                  "expecting %lu\n", (long unsigned)data.dsize,
2487                                  (long unsigned)sizeof(struct srvid_request)));
2488                 return;
2489         }
2490         if (data.dptr == NULL) {
2491                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2492                 return;
2493         }
2494
2495         r = (struct srvid_request_data *)data.dptr;
2496         timeout = r->data;
2497
2498         if (timeout == 0) {
2499                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2500                 clear_takeover_runs_disable(rec);
2501                 ret = ctdb_get_pnn(ctdb);
2502                 goto done;
2503         }
2504
2505         if (rec->takeover_run_in_progress) {
2506                 DEBUG(DEBUG_ERR,
2507                       ("Unable to disable takeover runs - in progress\n"));
2508                 ret = -EAGAIN;
2509                 goto done;
2510         }
2511
2512         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2513
2514         /* Clear any old timers */
2515         clear_takeover_runs_disable(rec);
2516
2517         /* When this is non-NULL it indicates that takeover runs are
2518          * disabled.  This context also holds the timeout timer.
2519          */
2520         rec->takeover_runs_disable_ctx = talloc_new(rec);
2521         if (rec->takeover_runs_disable_ctx == NULL) {
2522                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2523                 ret = -ENOMEM;
2524                 goto done;
2525         }
2526
2527         /* Arrange for the timeout to occur */
2528         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2529                         timeval_current_ofs(timeout, 0),
2530                         reenable_takeover_runs,
2531                         rec);
2532
2533         /* Returning our PNN tells the caller that we succeeded */
2534         ret = ctdb_get_pnn(ctdb);
2535 done:
2536         result.dsize = sizeof(int32_t);
2537         result.dptr  = (uint8_t *)&ret;
2538         srvid_request_reply(ctdb, (struct srvid_request *)r, result);
2539 }
2540
2541 /* Backward compatibility for this SRVID - call
2542  * disable_takeover_runs_handler() instead
2543  */
2544 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2545                                      TDB_DATA data, void *private_data)
2546 {
2547         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2548                                                     struct ctdb_recoverd);
2549         TDB_DATA data2;
2550         struct srvid_request_data *req;
2551
2552         if (data.dsize != sizeof(uint32_t)) {
2553                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2554                                  "expecting %lu\n", (long unsigned)data.dsize,
2555                                  (long unsigned)sizeof(uint32_t)));
2556                 return;
2557         }
2558         if (data.dptr == NULL) {
2559                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2560                 return;
2561         }
2562
2563         req = talloc(ctdb, struct srvid_request_data);
2564         CTDB_NO_MEMORY_VOID(ctdb, req);
2565
2566         req->srvid = 0; /* No reply */
2567         req->pnn = -1;
2568         req->data = *((uint32_t *)data.dptr); /* Timeout */
2569
2570         data2.dsize = sizeof(*req);
2571         data2.dptr = (uint8_t *)req;
2572
2573         disable_takeover_runs_handler(rec->ctdb,
2574                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2575                                       data2, rec);
2576 }
2577
2578 /*
2579   handler for ip reallocate, just add it to the list of requests and 
2580   handle this later in the monitor_cluster loop so we do not recurse
2581   with other requests to takeover_run()
2582 */
2583 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2584                                   TDB_DATA data, void *private_data)
2585 {
2586         struct srvid_request *request;
2587         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2588                                                     struct ctdb_recoverd);
2589
2590         if (data.dsize != sizeof(struct srvid_request)) {
2591                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2592                 return;
2593         }
2594
2595         request = (struct srvid_request *)data.dptr;
2596
2597         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2598 }
2599
2600 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2601                                           struct ctdb_recoverd *rec)
2602 {
2603         TDB_DATA result;
2604         int32_t ret;
2605         uint32_t culprit;
2606         struct srvid_requests *current;
2607
2608         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2609
2610         /* Only process requests that are currently pending.  More
2611          * might come in while the takeover run is in progress and
2612          * they will need to be processed later since they might
2613          * be in response flag changes.
2614          */
2615         current = rec->reallocate_requests;
2616         rec->reallocate_requests = NULL;
2617
2618         /* update the list of public ips that a node can handle for
2619            all connected nodes
2620         */
2621         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2622         if (ret != 0) {
2623                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2624                                  culprit));
2625                 rec->need_takeover_run = true;
2626         }
2627         if (ret == 0) {
2628                 if (do_takeover_run(rec, rec->nodemap, false)) {
2629                         ret = ctdb_get_pnn(ctdb);
2630                 } else {
2631                         ret = -1;
2632                 }
2633         }
2634
2635         result.dsize = sizeof(int32_t);
2636         result.dptr  = (uint8_t *)&ret;
2637
2638         srvid_requests_reply(ctdb, &current, result);
2639 }
2640
2641
2642 /*
2643   handler for recovery master elections
2644 */
2645 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2646                              TDB_DATA data, void *private_data)
2647 {
2648         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2649         int ret;
2650         struct election_message *em = (struct election_message *)data.dptr;
2651         TALLOC_CTX *mem_ctx;
2652
2653         /* Ignore election packets from ourself */
2654         if (ctdb->pnn == em->pnn) {
2655                 return;
2656         }
2657
2658         /* we got an election packet - update the timeout for the election */
2659         talloc_free(rec->election_timeout);
2660         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2661                                                 fast_start ?
2662                                                 timeval_current_ofs(0, 500000) :
2663                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2664                                                 ctdb_election_timeout, rec);
2665
2666         mem_ctx = talloc_new(ctdb);
2667
2668         /* someone called an election. check their election data
2669            and if we disagree and we would rather be the elected node, 
2670            send a new election message to all other nodes
2671          */
2672         if (ctdb_election_win(rec, em)) {
2673                 if (!rec->send_election_te) {
2674                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2675                                                                 timeval_current_ofs(0, 500000),
2676                                                                 election_send_request, rec);
2677                 }
2678                 talloc_free(mem_ctx);
2679                 /*unban_all_nodes(ctdb);*/
2680                 return;
2681         }
2682         
2683         /* we didn't win */
2684         talloc_free(rec->send_election_te);
2685         rec->send_election_te = NULL;
2686
2687         if (ctdb->tunable.verify_recovery_lock != 0) {
2688                 /* release the recmaster lock */
2689                 if (em->pnn != ctdb->pnn &&
2690                     ctdb->recovery_lock_fd != -1) {
2691                         DEBUG(DEBUG_NOTICE, ("Release the recovery lock\n"));
2692                         close(ctdb->recovery_lock_fd);
2693                         ctdb->recovery_lock_fd = -1;
2694                         unban_all_nodes(ctdb);
2695                 }
2696         }
2697
2698         /* ok, let that guy become recmaster then */
2699         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2700         if (ret != 0) {
2701                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2702                 talloc_free(mem_ctx);
2703                 return;
2704         }
2705
2706         talloc_free(mem_ctx);
2707         return;
2708 }
2709
2710
2711 /*
2712   force the start of the election process
2713  */
2714 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2715                            struct ctdb_node_map *nodemap)
2716 {
2717         int ret;
2718         struct ctdb_context *ctdb = rec->ctdb;
2719
2720         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2721
2722         /* set all nodes to recovery mode to stop all internode traffic */
2723         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2724         if (ret != 0) {
2725                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2726                 return;
2727         }
2728
2729         talloc_free(rec->election_timeout);
2730         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2731                                                 fast_start ?
2732                                                 timeval_current_ofs(0, 500000) :
2733                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2734                                                 ctdb_election_timeout, rec);
2735
2736         ret = send_election_request(rec, pnn);
2737         if (ret!=0) {
2738                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2739                 return;
2740         }
2741
2742         /* wait for a few seconds to collect all responses */
2743         ctdb_wait_election(rec);
2744 }
2745
2746
2747
2748 /*
2749   handler for when a node changes its flags
2750 */
2751 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2752                             TDB_DATA data, void *private_data)
2753 {
2754         int ret;
2755         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2756         struct ctdb_node_map *nodemap=NULL;
2757         TALLOC_CTX *tmp_ctx;
2758         int i;
2759         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2760         int disabled_flag_changed;
2761
2762         if (data.dsize != sizeof(*c)) {
2763                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2764                 return;
2765         }
2766
2767         tmp_ctx = talloc_new(ctdb);
2768         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2769
2770         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2771         if (ret != 0) {
2772                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2773                 talloc_free(tmp_ctx);
2774                 return;         
2775         }
2776
2777
2778         for (i=0;i<nodemap->num;i++) {
2779                 if (nodemap->nodes[i].pnn == c->pnn) break;
2780         }
2781
2782         if (i == nodemap->num) {
2783                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2784                 talloc_free(tmp_ctx);
2785                 return;
2786         }
2787
2788         if (c->old_flags != c->new_flags) {
2789                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2790         }
2791
2792         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2793
2794         nodemap->nodes[i].flags = c->new_flags;
2795
2796         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2797                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2798
2799         if (ret == 0) {
2800                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2801                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2802         }
2803         
2804         if (ret == 0 &&
2805             ctdb->recovery_master == ctdb->pnn &&
2806             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2807                 /* Only do the takeover run if the perm disabled or unhealthy
2808                    flags changed since these will cause an ip failover but not
2809                    a recovery.
2810                    If the node became disconnected or banned this will also
2811                    lead to an ip address failover but that is handled 
2812                    during recovery
2813                 */
2814                 if (disabled_flag_changed) {
2815                         rec->need_takeover_run = true;
2816                 }
2817         }
2818
2819         talloc_free(tmp_ctx);
2820 }
2821
2822 /*
2823   handler for when we need to push out flag changes ot all other nodes
2824 */
2825 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2826                             TDB_DATA data, void *private_data)
2827 {
2828         int ret;
2829         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2830         struct ctdb_node_map *nodemap=NULL;
2831         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2832         uint32_t recmaster;
2833         uint32_t *nodes;
2834
2835         /* find the recovery master */
2836         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2837         if (ret != 0) {
2838                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2839                 talloc_free(tmp_ctx);
2840                 return;
2841         }
2842
2843         /* read the node flags from the recmaster */
2844         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2845         if (ret != 0) {
2846                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2847                 talloc_free(tmp_ctx);
2848                 return;
2849         }
2850         if (c->pnn >= nodemap->num) {
2851                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2852                 talloc_free(tmp_ctx);
2853                 return;
2854         }
2855
2856         /* send the flags update to all connected nodes */
2857         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2858
2859         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2860                                       nodes, 0, CONTROL_TIMEOUT(),
2861                                       false, data,
2862                                       NULL, NULL,
2863                                       NULL) != 0) {
2864                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2865
2866                 talloc_free(tmp_ctx);
2867                 return;
2868         }
2869
2870         talloc_free(tmp_ctx);
2871 }
2872
2873
2874 struct verify_recmode_normal_data {
2875         uint32_t count;
2876         enum monitor_result status;
2877 };
2878
2879 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2880 {
2881         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2882
2883
2884         /* one more node has responded with recmode data*/
2885         rmdata->count--;
2886
2887         /* if we failed to get the recmode, then return an error and let
2888            the main loop try again.
2889         */
2890         if (state->state != CTDB_CONTROL_DONE) {
2891                 if (rmdata->status == MONITOR_OK) {
2892                         rmdata->status = MONITOR_FAILED;
2893                 }
2894                 return;
2895         }
2896
2897         /* if we got a response, then the recmode will be stored in the
2898            status field
2899         */
2900         if (state->status != CTDB_RECOVERY_NORMAL) {
2901                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2902                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2903         }
2904
2905         return;
2906 }
2907
2908
2909 /* verify that all nodes are in normal recovery mode */
2910 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2911 {
2912         struct verify_recmode_normal_data *rmdata;
2913         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2914         struct ctdb_client_control_state *state;
2915         enum monitor_result status;
2916         int j;
2917         
2918         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2919         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2920         rmdata->count  = 0;
2921         rmdata->status = MONITOR_OK;
2922
2923         /* loop over all active nodes and send an async getrecmode call to 
2924            them*/
2925         for (j=0; j<nodemap->num; j++) {
2926                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2927                         continue;
2928                 }
2929                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2930                                         CONTROL_TIMEOUT(), 
2931                                         nodemap->nodes[j].pnn);
2932                 if (state == NULL) {
2933                         /* we failed to send the control, treat this as 
2934                            an error and try again next iteration
2935                         */                      
2936                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2937                         talloc_free(mem_ctx);
2938                         return MONITOR_FAILED;
2939                 }
2940
2941                 /* set up the callback functions */
2942                 state->async.fn = verify_recmode_normal_callback;
2943                 state->async.private_data = rmdata;
2944
2945                 /* one more control to wait for to complete */
2946                 rmdata->count++;
2947         }
2948
2949
2950         /* now wait for up to the maximum number of seconds allowed
2951            or until all nodes we expect a response from has replied
2952         */
2953         while (rmdata->count > 0) {
2954                 event_loop_once(ctdb->ev);
2955         }
2956
2957         status = rmdata->status;
2958         talloc_free(mem_ctx);
2959         return status;
2960 }
2961
2962
2963 struct verify_recmaster_data {
2964         struct ctdb_recoverd *rec;
2965         uint32_t count;
2966         uint32_t pnn;
2967         enum monitor_result status;
2968 };
2969
2970 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2971 {
2972         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2973
2974
2975         /* one more node has responded with recmaster data*/
2976         rmdata->count--;
2977
2978         /* if we failed to get the recmaster, then return an error and let
2979            the main loop try again.
2980         */
2981         if (state->state != CTDB_CONTROL_DONE) {
2982                 if (rmdata->status == MONITOR_OK) {
2983                         rmdata->status = MONITOR_FAILED;
2984                 }
2985                 return;
2986         }
2987
2988         /* if we got a response, then the recmaster will be stored in the
2989            status field
2990         */
2991         if (state->status != rmdata->pnn) {
2992                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2993                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2994                 rmdata->status = MONITOR_ELECTION_NEEDED;
2995         }
2996
2997         return;
2998 }
2999
3000
3001 /* verify that all nodes agree that we are the recmaster */
3002 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
3003 {
3004         struct ctdb_context *ctdb = rec->ctdb;
3005         struct verify_recmaster_data *rmdata;
3006         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3007         struct ctdb_client_control_state *state;
3008         enum monitor_result status;
3009         int j;
3010         
3011         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3012         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3013         rmdata->rec    = rec;
3014         rmdata->count  = 0;
3015         rmdata->pnn    = pnn;
3016         rmdata->status = MONITOR_OK;
3017
3018         /* loop over all active nodes and send an async getrecmaster call to 
3019            them*/
3020         for (j=0; j<nodemap->num; j++) {
3021                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3022                         continue;
3023                 }
3024                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3025                                         CONTROL_TIMEOUT(),
3026                                         nodemap->nodes[j].pnn);
3027                 if (state == NULL) {
3028                         /* we failed to send the control, treat this as 
3029                            an error and try again next iteration
3030                         */                      
3031                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3032                         talloc_free(mem_ctx);
3033                         return MONITOR_FAILED;
3034                 }
3035
3036                 /* set up the callback functions */
3037                 state->async.fn = verify_recmaster_callback;
3038                 state->async.private_data = rmdata;
3039
3040                 /* one more control to wait for to complete */
3041                 rmdata->count++;
3042         }
3043
3044
3045         /* now wait for up to the maximum number of seconds allowed
3046            or until all nodes we expect a response from has replied
3047         */
3048         while (rmdata->count > 0) {
3049                 event_loop_once(ctdb->ev);
3050         }
3051
3052         status = rmdata->status;
3053         talloc_free(mem_ctx);
3054         return status;
3055 }
3056
3057 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3058                                     struct ctdb_recoverd *rec)
3059 {
3060         struct ctdb_control_get_ifaces *ifaces = NULL;
3061         TALLOC_CTX *mem_ctx;
3062         bool ret = false;
3063
3064         mem_ctx = talloc_new(NULL);
3065
3066         /* Read the interfaces from the local node */
3067         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3068                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3069                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3070                 /* We could return an error.  However, this will be
3071                  * rare so we'll decide that the interfaces have
3072                  * actually changed, just in case.
3073                  */
3074                 talloc_free(mem_ctx);
3075                 return true;
3076         }
3077
3078         if (!rec->ifaces) {
3079                 /* We haven't been here before so things have changed */
3080                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3081                 ret = true;
3082         } else if (rec->ifaces->num != ifaces->num) {
3083                 /* Number of interfaces has changed */
3084                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3085                                      rec->ifaces->num, ifaces->num));
3086                 ret = true;
3087         } else {
3088                 /* See if interface names or link states have changed */
3089                 int i;
3090                 for (i = 0; i < rec->ifaces->num; i++) {
3091                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3092                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3093                                 DEBUG(DEBUG_NOTICE,
3094                                       ("Interface in slot %d changed: %s => %s\n",
3095                                        i, iface->name, ifaces->ifaces[i].name));
3096                                 ret = true;
3097                                 break;
3098                         }
3099                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3100                                 DEBUG(DEBUG_NOTICE,
3101                                       ("Interface %s changed state: %d => %d\n",
3102                                        iface->name, iface->link_state,
3103                                        ifaces->ifaces[i].link_state));
3104                                 ret = true;
3105                                 break;
3106                         }
3107                 }
3108         }
3109
3110         talloc_free(rec->ifaces);
3111         rec->ifaces = talloc_steal(rec, ifaces);
3112
3113         talloc_free(mem_ctx);
3114         return ret;
3115 }
3116
3117 /* called to check that the local allocation of public ip addresses is ok.
3118 */
3119 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3120 {
3121         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3122         struct ctdb_uptime *uptime1 = NULL;
3123         struct ctdb_uptime *uptime2 = NULL;
3124         int ret, j;
3125         bool need_takeover_run = false;
3126
3127         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3128                                 CTDB_CURRENT_NODE, &uptime1);
3129         if (ret != 0) {
3130                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3131                 talloc_free(mem_ctx);
3132                 return -1;
3133         }
3134
3135         if (interfaces_have_changed(ctdb, rec)) {
3136                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3137                                      "local node %u - force takeover run\n",
3138                                      pnn));
3139                 need_takeover_run = true;
3140         }
3141
3142         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3143                                 CTDB_CURRENT_NODE, &uptime2);
3144         if (ret != 0) {
3145                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3146                 talloc_free(mem_ctx);
3147                 return -1;
3148         }
3149
3150         /* skip the check if the startrecovery time has changed */
3151         if (timeval_compare(&uptime1->last_recovery_started,
3152                             &uptime2->last_recovery_started) != 0) {
3153                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3154                 talloc_free(mem_ctx);
3155                 return 0;
3156         }
3157
3158         /* skip the check if the endrecovery time has changed */
3159         if (timeval_compare(&uptime1->last_recovery_finished,
3160                             &uptime2->last_recovery_finished) != 0) {
3161                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3162                 talloc_free(mem_ctx);
3163                 return 0;
3164         }
3165
3166         /* skip the check if we have started but not finished recovery */
3167         if (timeval_compare(&uptime1->last_recovery_finished,
3168                             &uptime1->last_recovery_started) != 1) {
3169                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3170                 talloc_free(mem_ctx);
3171
3172                 return 0;
3173         }
3174
3175         /* verify that we have the ip addresses we should have
3176            and we dont have ones we shouldnt have.
3177            if we find an inconsistency we set recmode to
3178            active on the local node and wait for the recmaster
3179            to do a full blown recovery.
3180            also if the pnn is -1 and we are healthy and can host the ip
3181            we also request a ip reallocation.
3182         */
3183         if (ctdb->tunable.disable_ip_failover == 0) {
3184                 struct ctdb_all_public_ips *ips = NULL;
3185
3186                 /* read the *available* IPs from the local node */
3187                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3188                 if (ret != 0) {
3189                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3190                         talloc_free(mem_ctx);
3191                         return -1;
3192                 }
3193
3194                 for (j=0; j<ips->num; j++) {
3195                         if (ips->ips[j].pnn == -1 &&
3196                             nodemap->nodes[pnn].flags == 0) {
3197                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3198                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3199                                 need_takeover_run = true;
3200                         }
3201                 }
3202
3203                 talloc_free(ips);
3204
3205                 /* read the *known* IPs from the local node */
3206                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3207                 if (ret != 0) {
3208                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3209                         talloc_free(mem_ctx);
3210                         return -1;
3211                 }
3212
3213                 for (j=0; j<ips->num; j++) {
3214                         if (ips->ips[j].pnn == pnn) {
3215                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3216                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3217                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3218                                         need_takeover_run = true;
3219                                 }
3220                         } else {
3221                                 if (ctdb->do_checkpublicip &&
3222                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3223
3224                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3225                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3226
3227                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3228                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3229                                         }
3230                                 }
3231                         }
3232                 }
3233         }
3234
3235         if (need_takeover_run) {
3236                 struct srvid_request rd;
3237                 TDB_DATA data;
3238
3239                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3240
3241                 rd.pnn = ctdb->pnn;
3242                 rd.srvid = 0;
3243                 data.dptr = (uint8_t *)&rd;
3244                 data.dsize = sizeof(rd);
3245
3246                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3247                 if (ret != 0) {
3248                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3249                 }
3250         }
3251         talloc_free(mem_ctx);
3252         return 0;
3253 }
3254
3255
3256 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3257 {
3258         struct ctdb_node_map **remote_nodemaps = callback_data;
3259
3260         if (node_pnn >= ctdb->num_nodes) {
3261                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3262                 return;
3263         }
3264
3265         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3266
3267 }
3268
3269 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3270         struct ctdb_node_map *nodemap,
3271         struct ctdb_node_map **remote_nodemaps)
3272 {
3273         uint32_t *nodes;
3274
3275         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3276         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3277                                         nodes, 0,
3278                                         CONTROL_TIMEOUT(), false, tdb_null,
3279                                         async_getnodemap_callback,
3280                                         NULL,
3281                                         remote_nodemaps) != 0) {
3282                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3283
3284                 return -1;
3285         }
3286
3287         return 0;
3288 }
3289
3290 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3291 struct ctdb_check_reclock_state {
3292         struct ctdb_context *ctdb;
3293         struct timeval start_time;
3294         int fd[2];
3295         pid_t child;
3296         struct timed_event *te;
3297         struct fd_event *fde;
3298         enum reclock_child_status status;
3299 };
3300
3301 /* when we free the reclock state we must kill any child process.
3302 */
3303 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3304 {
3305         struct ctdb_context *ctdb = state->ctdb;
3306
3307         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3308
3309         if (state->fd[0] != -1) {
3310                 close(state->fd[0]);
3311                 state->fd[0] = -1;
3312         }
3313         if (state->fd[1] != -1) {
3314                 close(state->fd[1]);
3315                 state->fd[1] = -1;
3316         }
3317         ctdb_kill(ctdb, state->child, SIGKILL);
3318         return 0;
3319 }
3320
3321 /*
3322   called if our check_reclock child times out. this would happen if
3323   i/o to the reclock file blocks.
3324  */
3325 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3326                                          struct timeval t, void *private_data)
3327 {
3328         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3329                                            struct ctdb_check_reclock_state);
3330
3331         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3332         state->status = RECLOCK_TIMEOUT;
3333 }
3334
3335 /* this is called when the child process has completed checking the reclock
3336    file and has written data back to us through the pipe.
3337 */
3338 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3339                              uint16_t flags, void *private_data)
3340 {
3341         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3342                                              struct ctdb_check_reclock_state);
3343         char c = 0;
3344         int ret;
3345
3346         /* we got a response from our child process so we can abort the
3347            timeout.
3348         */
3349         talloc_free(state->te);
3350         state->te = NULL;
3351
3352         ret = sys_read(state->fd[0], &c, 1);
3353         if (ret != 1 || c != RECLOCK_OK) {
3354                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3355                 state->status = RECLOCK_FAILED;
3356
3357                 return;
3358         }
3359
3360         state->status = RECLOCK_OK;
3361         return;
3362 }
3363
3364 static int check_recovery_lock(struct ctdb_context *ctdb)
3365 {
3366         int ret;
3367         struct ctdb_check_reclock_state *state;
3368         pid_t parent = getpid();
3369
3370         if (ctdb->recovery_lock_fd == -1) {
3371                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3372                 return -1;
3373         }
3374
3375         state = talloc(ctdb, struct ctdb_check_reclock_state);
3376         CTDB_NO_MEMORY(ctdb, state);
3377
3378         state->ctdb = ctdb;
3379         state->start_time = timeval_current();
3380         state->status = RECLOCK_CHECKING;
3381         state->fd[0] = -1;
3382         state->fd[1] = -1;
3383
3384         ret = pipe(state->fd);
3385         if (ret != 0) {
3386                 talloc_free(state);
3387                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3388                 return -1;
3389         }
3390
3391         state->child = ctdb_fork(ctdb);
3392         if (state->child == (pid_t)-1) {
3393                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3394                 close(state->fd[0]);
3395                 state->fd[0] = -1;
3396                 close(state->fd[1]);
3397                 state->fd[1] = -1;
3398                 talloc_free(state);
3399                 return -1;
3400         }
3401
3402         if (state->child == 0) {
3403                 char cc = RECLOCK_OK;
3404                 close(state->fd[0]);
3405                 state->fd[0] = -1;
3406
3407                 ctdb_set_process_name("ctdb_rec_reclock");
3408                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3409                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3410                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3411                         cc = RECLOCK_FAILED;
3412                 }
3413
3414                 sys_write(state->fd[1], &cc, 1);
3415                 /* make sure we die when our parent dies */
3416                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3417                         sleep(5);
3418                 }
3419                 _exit(0);
3420         }
3421         close(state->fd[1]);
3422         state->fd[1] = -1;
3423         set_close_on_exec(state->fd[0]);
3424
3425         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3426
3427         talloc_set_destructor(state, check_reclock_destructor);
3428
3429         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3430                                     ctdb_check_reclock_timeout, state);
3431         if (state->te == NULL) {
3432                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3433                 talloc_free(state);
3434                 return -1;
3435         }
3436
3437         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3438                                 EVENT_FD_READ,
3439                                 reclock_child_handler,
3440                                 (void *)state);
3441
3442         if (state->fde == NULL) {
3443                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3444                 talloc_free(state);
3445                 return -1;
3446         }
3447         tevent_fd_set_auto_close(state->fde);
3448
3449         while (state->status == RECLOCK_CHECKING) {
3450                 event_loop_once(ctdb->ev);
3451         }
3452
3453         if (state->status == RECLOCK_FAILED) {
3454                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3455                 close(ctdb->recovery_lock_fd);
3456                 ctdb->recovery_lock_fd = -1;
3457                 talloc_free(state);
3458                 return -1;
3459         }
3460
3461         talloc_free(state);
3462         return 0;
3463 }
3464
3465 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3466 {
3467         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3468         const char *reclockfile;
3469
3470         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3471                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3472                 talloc_free(tmp_ctx);
3473                 return -1;      
3474         }
3475
3476         if (reclockfile == NULL) {
3477                 if (ctdb->recovery_lock_file != NULL) {
3478                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3479                         talloc_free(ctdb->recovery_lock_file);
3480                         ctdb->recovery_lock_file = NULL;
3481                         if (ctdb->recovery_lock_fd != -1) {
3482                                 close(ctdb->recovery_lock_fd);
3483                                 ctdb->recovery_lock_fd = -1;
3484                         }
3485                 }
3486                 ctdb->tunable.verify_recovery_lock = 0;
3487                 talloc_free(tmp_ctx);
3488                 return 0;
3489         }
3490
3491         if (ctdb->recovery_lock_file == NULL) {
3492                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3493                 if (ctdb->recovery_lock_fd != -1) {
3494                         close(ctdb->recovery_lock_fd);
3495                         ctdb->recovery_lock_fd = -1;
3496                 }
3497                 talloc_free(tmp_ctx);
3498                 return 0;
3499         }
3500
3501
3502         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3503                 talloc_free(tmp_ctx);
3504                 return 0;
3505         }
3506
3507         talloc_free(ctdb->recovery_lock_file);
3508         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3509         ctdb->tunable.verify_recovery_lock = 0;
3510         if (ctdb->recovery_lock_fd != -1) {
3511                 close(ctdb->recovery_lock_fd);
3512                 ctdb->recovery_lock_fd = -1;
3513         }
3514
3515         talloc_free(tmp_ctx);
3516         return 0;
3517 }
3518
3519 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3520                       TALLOC_CTX *mem_ctx)
3521 {
3522         uint32_t pnn;
3523         struct ctdb_node_map *nodemap=NULL;
3524         struct ctdb_node_map *recmaster_nodemap=NULL;
3525         struct ctdb_node_map **remote_nodemaps=NULL;
3526         struct ctdb_vnn_map *vnnmap=NULL;
3527         struct ctdb_vnn_map *remote_vnnmap=NULL;
3528         int32_t debug_level;
3529         int i, j, ret;
3530         bool self_ban;
3531
3532
3533         /* verify that the main daemon is still running */
3534         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3535                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3536                 exit(-1);
3537         }
3538
3539         /* ping the local daemon to tell it we are alive */
3540         ctdb_ctrl_recd_ping(ctdb);
3541
3542         if (rec->election_timeout) {
3543                 /* an election is in progress */
3544                 return;
3545         }
3546
3547         /* read the debug level from the parent and update locally */
3548         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3549         if (ret !=0) {
3550                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3551                 return;
3552         }
3553         DEBUGLEVEL = debug_level;
3554
3555         /* get relevant tunables */
3556         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3557         if (ret != 0) {
3558                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3559                 return;
3560         }
3561
3562         /* get runstate */
3563         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3564                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3565         if (ret != 0) {
3566                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3567                 return;
3568         }
3569
3570         /* get the current recovery lock file from the server */
3571         if (update_recovery_lock_file(ctdb) != 0) {
3572                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3573                 return;
3574         }
3575
3576         /* Make sure that if recovery lock verification becomes disabled when
3577            we close the file
3578         */
3579         if (ctdb->tunable.verify_recovery_lock == 0) {
3580                 if (ctdb->recovery_lock_fd != -1) {
3581                         close(ctdb->recovery_lock_fd);
3582                         ctdb->recovery_lock_fd = -1;
3583                 }
3584         }
3585
3586         pnn = ctdb_get_pnn(ctdb);
3587
3588         /* get the vnnmap */
3589         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3590         if (ret != 0) {
3591                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3592                 return;
3593         }
3594
3595
3596         /* get number of nodes */
3597         if (rec->nodemap) {
3598                 talloc_free(rec->nodemap);
3599                 rec->nodemap = NULL;
3600                 nodemap=NULL;
3601         }
3602         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3603         if (ret != 0) {
3604                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3605                 return;
3606         }
3607         nodemap = rec->nodemap;
3608
3609         /* remember our own node flags */
3610         rec->node_flags = nodemap->nodes[pnn].flags;
3611
3612         ban_misbehaving_nodes(rec, &self_ban);
3613         if (self_ban) {
3614                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3615                 return;
3616         }
3617
3618         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3619            also frozen and that the recmode is set to active.
3620         */
3621         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3622                 /* If this node has become inactive then we want to
3623                  * reduce the chances of it taking over the recovery
3624                  * master role when it becomes active again.  This
3625                  * helps to stabilise the recovery master role so that
3626                  * it stays on the most stable node.
3627                  */
3628                 rec->priority_time = timeval_current();
3629
3630                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3631                 if (ret != 0) {
3632                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3633                 }
3634                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3635                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3636
3637                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3638                         if (ret != 0) {
3639                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3640
3641                                 return;
3642                         }
3643                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3644                         if (ret != 0) {
3645                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3646                                 return;
3647                         }
3648                 }
3649
3650                 /* If this node is stopped or banned then it is not the recovery
3651                  * master, so don't do anything. This prevents stopped or banned
3652                  * node from starting election and sending unnecessary controls.
3653                  */
3654                 return;
3655         }
3656
3657         /* check which node is the recovery master */
3658         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3659         if (ret != 0) {
3660                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3661                 return;
3662         }
3663
3664         /* If we are not the recmaster then do some housekeeping */
3665         if (rec->recmaster != pnn) {
3666                 /* Ignore any IP reallocate requests - only recmaster
3667                  * processes them
3668                  */
3669                 TALLOC_FREE(rec->reallocate_requests);
3670                 /* Clear any nodes that should be force rebalanced in
3671                  * the next takeover run.  If the recovery master role
3672                  * has moved then we don't want to process these some
3673                  * time in the future.
3674                  */
3675                 TALLOC_FREE(rec->force_rebalance_nodes);
3676         }
3677
3678         /* This is a special case.  When recovery daemon is started, recmaster
3679          * is set to -1.  If a node is not started in stopped state, then
3680          * start election to decide recovery master
3681          */
3682         if (rec->recmaster == (uint32_t)-1) {
3683                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3684                 force_election(rec, pnn, nodemap);
3685                 return;
3686         }
3687
3688         /* update the capabilities for all nodes */
3689         ret = update_capabilities(ctdb, nodemap);
3690         if (ret != 0) {
3691                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3692                 return;
3693         }
3694
3695         /*
3696          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3697          * but we have, then force an election and try to become the new
3698          * recmaster.
3699          */
3700         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3701             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3702              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3703                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3704                                   " but we (node %u) have - force an election\n",
3705                                   rec->recmaster, pnn));
3706                 force_election(rec, pnn, nodemap);
3707                 return;
3708         }
3709
3710         /* count how many active nodes there are */
3711         rec->num_active    = 0;
3712         rec->num_lmasters  = 0;
3713         rec->num_connected = 0;
3714         for (i=0; i<nodemap->num; i++) {
3715                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3716                         rec->num_active++;
3717                         if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
3718                                 rec->num_lmasters++;
3719                         }
3720                 }
3721                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3722                         rec->num_connected++;
3723                 }
3724         }
3725
3726
3727         /* verify that the recmaster node is still active */
3728         for (j=0; j<nodemap->num; j++) {
3729                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3730                         break;
3731                 }
3732         }
3733
3734         if (j == nodemap->num) {
3735                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3736                 force_election(rec, pnn, nodemap);
3737                 return;
3738         }
3739
3740         /* if recovery master is disconnected we must elect a new recmaster */
3741         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3742                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3743                 force_election(rec, pnn, nodemap);
3744                 return;
3745         }
3746
3747         /* get nodemap from the recovery master to check if it is inactive */
3748         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3749                                    mem_ctx, &recmaster_nodemap);
3750         if (ret != 0) {
3751                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3752                           nodemap->nodes[j].pnn));
3753                 return;
3754         }
3755
3756
3757         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3758             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3759                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3760                 /*
3761                  * update our nodemap to carry the recmaster's notion of
3762                  * its own flags, so that we don't keep freezing the
3763                  * inactive recmaster node...
3764                  */
3765                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3766                 force_election(rec, pnn, nodemap);
3767                 return;
3768         }
3769
3770         /* verify that we have all ip addresses we should have and we dont
3771          * have addresses we shouldnt have.
3772          */ 
3773         if (ctdb->tunable.disable_ip_failover == 0 &&
3774             rec->takeover_runs_disable_ctx == NULL) {
3775                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3776                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3777                 }
3778         }
3779
3780
3781         /* if we are not the recmaster then we do not need to check
3782            if recovery is needed
3783          */
3784         if (pnn != rec->recmaster) {
3785                 return;
3786         }
3787
3788
3789         /* ensure our local copies of flags are right */
3790         ret = update_local_flags(rec, nodemap);
3791         if (ret == MONITOR_ELECTION_NEEDED) {
3792                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3793                 force_election(rec, pnn, nodemap);
3794                 return;
3795         }
3796         if (ret != MONITOR_OK) {
3797                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3798                 return;
3799         }
3800
3801         if (ctdb->num_nodes != nodemap->num) {
3802                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3803                 ctdb_load_nodes_file(ctdb);
3804                 return;
3805         }
3806
3807         /* verify that all active nodes agree that we are the recmaster */
3808         switch (verify_recmaster(rec, nodemap, pnn)) {
3809         case MONITOR_RECOVERY_NEEDED:
3810                 /* can not happen */
3811                 return;
3812         case MONITOR_ELECTION_NEEDED:
3813                 force_election(rec, pnn, nodemap);
3814                 return;
3815         case MONITOR_OK:
3816                 break;
3817         case MONITOR_FAILED:
3818                 return;
3819         }
3820
3821
3822         if (rec->need_recovery) {
3823                 /* a previous recovery didn't finish */
3824                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3825                 return;
3826         }
3827
3828         /* verify that all active nodes are in normal mode 
3829            and not in recovery mode 
3830         */
3831         switch (verify_recmode(ctdb, nodemap)) {
3832         case MONITOR_RECOVERY_NEEDED:
3833                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3834                 return;
3835         case MONITOR_FAILED:
3836                 return;
3837         case MONITOR_ELECTION_NEEDED:
3838                 /* can not happen */
3839         case MONITOR_OK:
3840                 break;
3841         }
3842
3843
3844         if (ctdb->tunable.verify_recovery_lock != 0) {
3845                 /* we should have the reclock - check its not stale */
3846                 ret = check_recovery_lock(ctdb);
3847                 if (ret != 0) {
3848                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3849                         ctdb_set_culprit(rec, ctdb->pnn);
3850                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3851                         return;
3852                 }
3853         }
3854
3855
3856         /* if there are takeovers requested, perform it and notify the waiters */
3857         if (rec->takeover_runs_disable_ctx == NULL &&
3858             rec->reallocate_requests) {
3859                 process_ipreallocate_requests(ctdb, rec);
3860         }
3861
3862         /* get the nodemap for all active remote nodes
3863          */
3864         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3865         if (remote_nodemaps == NULL) {
3866                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3867                 return;
3868         }
3869         for(i=0; i<nodemap->num; i++) {
3870                 remote_nodemaps[i] = NULL;
3871         }
3872         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3873                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3874                 return;
3875         } 
3876
3877         /* verify that all other nodes have the same nodemap as we have
3878         */
3879         for (j=0; j<nodemap->num; j++) {
3880                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3881                         continue;
3882                 }
3883
3884                 if (remote_nodemaps[j] == NULL) {
3885                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3886                         ctdb_set_culprit(rec, j);
3887
3888                         return;
3889                 }
3890
3891                 /* if the nodes disagree on how many nodes there are
3892                    then this is a good reason to try recovery
3893                  */
3894                 if (remote_nodemaps[j]->num != nodemap->num) {
3895                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3896                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3897                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3898                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3899                         return;
3900                 }
3901
3902                 /* if the nodes disagree on which nodes exist and are
3903                    active, then that is also a good reason to do recovery
3904                  */
3905                 for (i=0;i<nodemap->num;i++) {
3906                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3907                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3908                                           nodemap->nodes[j].pnn, i, 
3909                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3910                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3911                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3912                                             vnnmap);
3913                                 return;
3914                         }
3915                 }
3916         }
3917
3918         /*
3919          * Update node flags obtained from each active node. This ensure we have
3920          * up-to-date information for all the nodes.
3921          */
3922         for (j=0; j<nodemap->num; j++) {
3923                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3924                         continue;
3925                 }
3926                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3927         }
3928
3929         for (j=0; j<nodemap->num; j++) {
3930                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3931                         continue;
3932                 }
3933
3934                 /* verify the flags are consistent
3935                 */
3936                 for (i=0; i<nodemap->num; i++) {
3937                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3938                                 continue;
3939                         }
3940                         
3941                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3942                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3943                                   nodemap->nodes[j].pnn, 
3944                                   nodemap->nodes[i].pnn, 
3945                                   remote_nodemaps[j]->nodes[i].flags,
3946                                   nodemap->nodes[i].flags));
3947                                 if (i == j) {
3948                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3949                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3950                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3951                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3952                                                     vnnmap);
3953                                         return;
3954                                 } else {
3955                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3956                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3957                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3958                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3959                                                     vnnmap);
3960                                         return;
3961                                 }
3962                         }
3963                 }
3964         }
3965
3966
3967         /* There must be the same number of lmasters in the vnn map as
3968          * there are active nodes with the lmaster capability...  or
3969          * do a recovery.
3970          */
3971         if (vnnmap->size != rec->num_lmasters) {
3972                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3973                           vnnmap->size, rec->num_lmasters));
3974                 ctdb_set_culprit(rec, ctdb->pnn);
3975                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3976                 return;
3977         }
3978
3979         /* verify that all active nodes in the nodemap also exist in 
3980            the vnnmap.
3981          */
3982         for (j=0; j<nodemap->num; j++) {
3983                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3984                         continue;
3985                 }
3986                 if (nodemap->nodes[j].pnn == pnn) {
3987                         continue;
3988                 }
3989
3990                 for (i=0; i<vnnmap->size; i++) {
3991                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3992                                 break;
3993                         }
3994                 }
3995                 if (i == vnnmap->size) {
3996                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3997                                   nodemap->nodes[j].pnn));
3998                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3999                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4000                         return;
4001                 }
4002         }
4003
4004         
4005         /* verify that all other nodes have the same vnnmap
4006            and are from the same generation
4007          */
4008         for (j=0; j<nodemap->num; j++) {
4009                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
4010                         continue;
4011                 }
4012                 if (nodemap->nodes[j].pnn == pnn) {
4013                         continue;
4014                 }
4015
4016                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
4017                                           mem_ctx, &remote_vnnmap);
4018                 if (ret != 0) {
4019                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
4020                                   nodemap->nodes[j].pnn));
4021                         return;
4022                 }
4023
4024                 /* verify the vnnmap generation is the same */
4025                 if (vnnmap->generation != remote_vnnmap->generation) {
4026                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
4027                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
4028                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4029                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4030                         return;
4031                 }
4032
4033                 /* verify the vnnmap size is the same */
4034                 if (vnnmap->size != remote_vnnmap->size) {
4035                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4036                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4037                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4038                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4039                         return;
4040                 }
4041
4042                 /* verify the vnnmap is the same */
4043                 for (i=0;i<vnnmap->size;i++) {
4044                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4045                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4046                                           nodemap->nodes[j].pnn));
4047                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4048                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4049                                             vnnmap);
4050                                 return;
4051                         }
4052                 }
4053         }
4054
4055         /* we might need to change who has what IP assigned */
4056         if (rec->need_takeover_run) {
4057                 uint32_t culprit = (uint32_t)-1;
4058
4059                 rec->need_takeover_run = false;
4060
4061                 /* update the list of public ips that a node can handle for
4062                    all connected nodes
4063                 */
4064                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4065                 if (ret != 0) {
4066                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4067                                          culprit));
4068                         rec->need_takeover_run = true;
4069                         return;
4070                 }
4071
4072                 /* execute the "startrecovery" event script on all nodes */
4073                 ret = run_startrecovery_eventscript(rec, nodemap);
4074                 if (ret!=0) {
4075                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4076                         ctdb_set_culprit(rec, ctdb->pnn);
4077                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4078                         return;
4079                 }
4080
4081                 /* If takeover run fails, then the offending nodes are
4082                  * assigned ban culprit counts. And we re-try takeover.
4083                  * If takeover run fails repeatedly, the node would get
4084                  * banned.
4085                  *
4086                  * If rec->need_takeover_run is not set to true at this
4087                  * failure, monitoring is disabled cluster-wide (via
4088                  * startrecovery eventscript) and will not get enabled.
4089                  */
4090                 if (!do_takeover_run(rec, nodemap, true)) {
4091                         return;
4092                 }
4093
4094                 /* execute the "recovered" event script on all nodes */
4095                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4096 #if 0
4097 // we cant check whether the event completed successfully
4098 // since this script WILL fail if the node is in recovery mode
4099 // and if that race happens, the code here would just cause a second
4100 // cascading recovery.
4101                 if (ret!=0) {
4102                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4103                         ctdb_set_culprit(rec, ctdb->pnn);
4104                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4105                 }
4106 #endif
4107         }
4108 }
4109
4110 /*
4111   the main monitoring loop
4112  */
4113 static void monitor_cluster(struct ctdb_context *ctdb)
4114 {
4115         struct ctdb_recoverd *rec;
4116
4117         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4118
4119         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4120         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4121
4122         rec->ctdb = ctdb;
4123
4124         rec->takeover_run_in_progress = false;
4125
4126         rec->priority_time = timeval_current();
4127
4128         /* register a message port for sending memory dumps */
4129         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4130
4131         /* register a message port for recovery elections */
4132         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4133
4134         /* when nodes are disabled/enabled */
4135         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4136
4137         /* when we are asked to puch out a flag change */
4138         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4139
4140         /* register a message port for vacuum fetch */
4141         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4142
4143         /* register a message port for reloadnodes  */
4144         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4145
4146         /* register a message port for performing a takeover run */
4147         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4148
4149         /* register a message port for disabling the ip check for a short while */
4150         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4151
4152         /* register a message port for updating the recovery daemons node assignment for an ip */
4153         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4154
4155         /* register a message port for forcing a rebalance of a node next
4156            reallocation */
4157         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4158
4159         /* Register a message port for disabling takeover runs */
4160         ctdb_client_set_message_handler(ctdb,
4161                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4162                                         disable_takeover_runs_handler, rec);
4163
4164         /* register a message port for detaching database */
4165         ctdb_client_set_message_handler(ctdb,
4166                                         CTDB_SRVID_DETACH_DATABASE,
4167                                         detach_database_handler, rec);
4168
4169         for (;;) {
4170                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4171                 struct timeval start;
4172                 double elapsed;
4173
4174                 if (!mem_ctx) {
4175                         DEBUG(DEBUG_CRIT,(__location__
4176                                           " Failed to create temp context\n"));
4177                         exit(-1);
4178                 }
4179
4180                 start = timeval_current();
4181                 main_loop(ctdb, rec, mem_ctx);
4182                 talloc_free(mem_ctx);
4183
4184                 /* we only check for recovery once every second */
4185                 elapsed = timeval_elapsed(&start);
4186                 if (elapsed < ctdb->tunable.recover_interval) {
4187                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4188                                           - elapsed);
4189                 }
4190         }
4191 }
4192
4193 /*
4194   event handler for when the main ctdbd dies
4195  */
4196 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4197                                  uint16_t flags, void *private_data)
4198 {
4199         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4200         _exit(1);
4201 }
4202
4203 /*
4204   called regularly to verify that the recovery daemon is still running
4205  */
4206 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4207                               struct timeval yt, void *p)
4208 {
4209         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4210
4211         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4212                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4213
4214                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4215                                 ctdb_restart_recd, ctdb);
4216
4217                 return;
4218         }
4219
4220         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4221                         timeval_current_ofs(30, 0),
4222                         ctdb_check_recd, ctdb);
4223 }
4224
4225 static void recd_sig_child_handler(struct event_context *ev,
4226         struct signal_event *se, int signum, int count,
4227         void *dont_care, 
4228         void *private_data)
4229 {
4230 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4231         int status;
4232         pid_t pid = -1;
4233
4234         while (pid != 0) {
4235                 pid = waitpid(-1, &status, WNOHANG);
4236                 if (pid == -1) {
4237                         if (errno != ECHILD) {
4238                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4239                         }
4240                         return;
4241                 }
4242                 if (pid > 0) {
4243                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4244                 }
4245         }
4246 }
4247
4248 /*
4249   startup the recovery daemon as a child of the main ctdb daemon
4250  */
4251 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4252 {
4253         int fd[2];
4254         struct signal_event *se;
4255         struct tevent_fd *fde;
4256
4257         if (pipe(fd) != 0) {
4258                 return -1;
4259         }
4260
4261         ctdb->recoverd_pid = ctdb_fork(ctdb);
4262         if (ctdb->recoverd_pid == -1) {
4263                 return -1;
4264         }
4265
4266         if (ctdb->recoverd_pid != 0) {
4267                 talloc_free(ctdb->recd_ctx);
4268                 ctdb->recd_ctx = talloc_new(ctdb);
4269                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4270
4271                 close(fd[0]);
4272                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4273                                 timeval_current_ofs(30, 0),
4274                                 ctdb_check_recd, ctdb);
4275                 return 0;
4276         }
4277
4278         close(fd[1]);
4279
4280         srandom(getpid() ^ time(NULL));
4281
4282         ctdb_set_process_name("ctdb_recovered");
4283         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4284                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4285                 exit(1);
4286         }
4287
4288         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4289
4290         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4291                      ctdb_recoverd_parent, &fd[0]);
4292         tevent_fd_set_auto_close(fde);
4293
4294         /* set up a handler to pick up sigchld */
4295         se = event_add_signal(ctdb->ev, ctdb,
4296                                      SIGCHLD, 0,
4297                                      recd_sig_child_handler,
4298                                      ctdb);
4299         if (se == NULL) {
4300                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4301                 exit(1);
4302         }
4303
4304         monitor_cluster(ctdb);
4305
4306         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4307         return -1;
4308 }
4309
4310 /*
4311   shutdown the recovery daemon
4312  */
4313 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4314 {
4315         if (ctdb->recoverd_pid == 0) {
4316                 return;
4317         }
4318
4319         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4320         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4321
4322         TALLOC_FREE(ctdb->recd_ctx);
4323         TALLOC_FREE(ctdb->recd_ping_count);
4324 }
4325
4326 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4327                        struct timeval t, void *private_data)
4328 {
4329         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4330
4331         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4332         ctdb_stop_recoverd(ctdb);
4333         ctdb_start_recoverd(ctdb);
4334 }