Restart recovery dameon if it looks like it hung.
[kamenim/samba-autobuild/.git] / ctdb / server / ctdb_recover.c
1 /* 
2    ctdb recovery code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
29 #include "db_wrap.h"
30
31 /*
32   lock all databases - mark only
33  */
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb, uint32_t priority)
35 {
36         struct ctdb_db_context *ctdb_db;
37
38         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
39                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
40                 return -1;
41         }
42
43         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
44                 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
45                 return -1;
46         }
47         /* The dual loop is a woraround for older versions of samba
48            that does not yet support the set-db-priority/lock order
49            call. So that we get basic deadlock avoiidance also for
50            these old versions of samba.
51            This code will be removed in the future.
52         */
53         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
54                 if (ctdb_db->priority != priority) {
55                         continue;
56                 }
57                 if (strstr(ctdb_db->db_name, "notify") != NULL) {
58                         continue;
59                 }
60                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
61                         return -1;
62                 }
63                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
64                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
65                         return -1;
66                 }
67         }
68         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
69                 if (ctdb_db->priority != priority) {
70                         continue;
71                 }
72                 if (strstr(ctdb_db->db_name, "notify") == NULL) {
73                         continue;
74                 }
75                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
76                         return -1;
77                 }
78                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
79                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
80                         return -1;
81                 }
82         }
83         return 0;
84 }
85
86 /*
87   lock all databases - unmark only
88  */
89 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb, uint32_t priority)
90 {
91         struct ctdb_db_context *ctdb_db;
92
93         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
94                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
95                 return -1;
96         }
97
98         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
99                 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
100                 return -1;
101         }
102         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
103                 if (ctdb_db->priority != priority) {
104                         continue;
105                 }
106                 tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
107                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
108                         return -1;
109                 }
110         }
111         return 0;
112 }
113
114
115 int 
116 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
117 {
118         CHECK_CONTROL_DATA_SIZE(0);
119         struct ctdb_vnn_map_wire *map;
120         size_t len;
121
122         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
123         map = talloc_size(outdata, len);
124         CTDB_NO_MEMORY(ctdb, map);
125
126         map->generation = ctdb->vnn_map->generation;
127         map->size = ctdb->vnn_map->size;
128         memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
129
130         outdata->dsize = len;
131         outdata->dptr  = (uint8_t *)map;
132
133         return 0;
134 }
135
136 int 
137 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
138 {
139         struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
140         int i;
141
142         for(i=1; i<=NUM_DB_PRIORITIES; i++) {
143                 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
144                         DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
145                         return -1;
146                 }
147         }
148
149         talloc_free(ctdb->vnn_map);
150
151         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
152         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
153
154         ctdb->vnn_map->generation = map->generation;
155         ctdb->vnn_map->size       = map->size;
156         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
157         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
158
159         memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
160
161         return 0;
162 }
163
164 int 
165 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
166 {
167         uint32_t i, len;
168         struct ctdb_db_context *ctdb_db;
169         struct ctdb_dbid_map *dbid_map;
170
171         CHECK_CONTROL_DATA_SIZE(0);
172
173         len = 0;
174         for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
175                 len++;
176         }
177
178
179         outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
180         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
181         if (!outdata->dptr) {
182                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
183                 exit(1);
184         }
185
186         dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
187         dbid_map->num = len;
188         for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
189                 dbid_map->dbs[i].dbid       = ctdb_db->db_id;
190                 dbid_map->dbs[i].persistent = ctdb_db->persistent;
191         }
192
193         return 0;
194 }
195
196 int 
197 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
198 {
199         uint32_t i, num_nodes;
200         struct ctdb_node_map *node_map;
201
202         CHECK_CONTROL_DATA_SIZE(0);
203
204         num_nodes = ctdb->num_nodes;
205
206         outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
207         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
208         if (!outdata->dptr) {
209                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
210                 exit(1);
211         }
212
213         node_map = (struct ctdb_node_map *)outdata->dptr;
214         node_map->num = num_nodes;
215         for (i=0; i<num_nodes; i++) {
216                 if (parse_ip(ctdb->nodes[i]->address.address,
217                              NULL, /* TODO: pass in the correct interface here*/
218                              0,
219                              &node_map->nodes[i].addr) == 0)
220                 {
221                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
222                 }
223
224                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
225                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
226         }
227
228         return 0;
229 }
230
231 /*
232    get an old style ipv4-only nodemap
233 */
234 int 
235 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
236 {
237         uint32_t i, num_nodes;
238         struct ctdb_node_mapv4 *node_map;
239
240         CHECK_CONTROL_DATA_SIZE(0);
241
242         num_nodes = ctdb->num_nodes;
243
244         outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
245         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
246         if (!outdata->dptr) {
247                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
248                 exit(1);
249         }
250
251         node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
252         node_map->num = num_nodes;
253         for (i=0; i<num_nodes; i++) {
254                 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
255                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
256                         return -1;
257                 }
258
259                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
260                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
261         }
262
263         return 0;
264 }
265
266 static void
267 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te, 
268                                struct timeval t, void *private_data)
269 {
270         int i, num_nodes;
271         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
272         TALLOC_CTX *tmp_ctx;
273         struct ctdb_node **nodes;       
274
275         tmp_ctx = talloc_new(ctdb);
276
277         /* steal the old nodes file for a while */
278         talloc_steal(tmp_ctx, ctdb->nodes);
279         nodes = ctdb->nodes;
280         ctdb->nodes = NULL;
281         num_nodes = ctdb->num_nodes;
282         ctdb->num_nodes = 0;
283
284         /* load the new nodes file */
285         ctdb_load_nodes_file(ctdb);
286
287         for (i=0; i<ctdb->num_nodes; i++) {
288                 /* keep any identical pre-existing nodes and connections */
289                 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
290                         talloc_free(ctdb->nodes[i]);
291                         ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
292                         continue;
293                 }
294
295                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
296                         continue;
297                 }
298
299                 /* any new or different nodes must be added */
300                 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
301                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
302                         ctdb_fatal(ctdb, "failed to add node. shutting down\n");
303                 }
304                 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
305                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
306                         ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
307                 }
308         }
309
310         /* tell the recovery daemon to reaload the nodes file too */
311         ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
312
313         talloc_free(tmp_ctx);
314         return;
315 }
316
317 /*
318   reload the nodes file after a short delay (so that we can send the response
319   back first
320 */
321 int 
322 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
323 {
324         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
325
326         return 0;
327 }
328
329 /* 
330    a traverse function for pulling all relevent records from pulldb
331  */
332 struct pulldb_data {
333         struct ctdb_context *ctdb;
334         struct ctdb_marshall_buffer *pulldata;
335         uint32_t len;
336         bool failed;
337 };
338
339 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
340 {
341         struct pulldb_data *params = (struct pulldb_data *)p;
342         struct ctdb_rec_data *rec;
343
344         /* add the record to the blob */
345         rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
346         if (rec == NULL) {
347                 params->failed = true;
348                 return -1;
349         }
350         params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
351         if (params->pulldata == NULL) {
352                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
353                 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
354         }
355         params->pulldata->count++;
356         memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
357         params->len += rec->length;
358         talloc_free(rec);
359
360         return 0;
361 }
362
363 /*
364   pul a bunch of records from a ltdb, filtering by lmaster
365  */
366 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
367 {
368         struct ctdb_control_pulldb *pull;
369         struct ctdb_db_context *ctdb_db;
370         struct pulldb_data params;
371         struct ctdb_marshall_buffer *reply;
372
373         pull = (struct ctdb_control_pulldb *)indata.dptr;
374         
375         ctdb_db = find_ctdb_db(ctdb, pull->db_id);
376         if (!ctdb_db) {
377                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
378                 return -1;
379         }
380
381         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
382                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
383                 return -1;
384         }
385
386         reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
387         CTDB_NO_MEMORY(ctdb, reply);
388
389         reply->db_id = pull->db_id;
390
391         params.ctdb = ctdb;
392         params.pulldata = reply;
393         params.len = offsetof(struct ctdb_marshall_buffer, data);
394         params.failed = false;
395
396         if (ctdb_db->unhealthy_reason) {
397                 /* this is just a warning, as the tdb should be empty anyway */
398                 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
399                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
400         }
401
402         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
404                 return -1;
405         }
406
407         if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
408                 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
409                 ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
410                 talloc_free(params.pulldata);
411                 return -1;
412         }
413
414         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
415
416         outdata->dptr = (uint8_t *)params.pulldata;
417         outdata->dsize = params.len;
418
419         return 0;
420 }
421
422 /*
423   push a bunch of records into a ltdb, filtering by rsn
424  */
425 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
426 {
427         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
428         struct ctdb_db_context *ctdb_db;
429         int i, ret;
430         struct ctdb_rec_data *rec;
431
432         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
433                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
434                 return -1;
435         }
436
437         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
438         if (!ctdb_db) {
439                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
440                 return -1;
441         }
442
443         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
444                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
445                 return -1;
446         }
447
448         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
449                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
450                 return -1;
451         }
452
453         rec = (struct ctdb_rec_data *)&reply->data[0];
454
455         DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
456                  reply->count, reply->db_id));
457
458         for (i=0;i<reply->count;i++) {
459                 TDB_DATA key, data;
460                 struct ctdb_ltdb_header *hdr;
461
462                 key.dptr = &rec->data[0];
463                 key.dsize = rec->keylen;
464                 data.dptr = &rec->data[key.dsize];
465                 data.dsize = rec->datalen;
466
467                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
468                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
469                         goto failed;
470                 }
471                 hdr = (struct ctdb_ltdb_header *)data.dptr;
472                 data.dptr += sizeof(*hdr);
473                 data.dsize -= sizeof(*hdr);
474
475                 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
476                 if (ret != 0) {
477                         DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
478                         goto failed;
479                 }
480
481                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
482         }           
483
484         DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
485                  reply->count, reply->db_id));
486
487         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
488         return 0;
489
490 failed:
491         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
492         return -1;
493 }
494
495
496 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
497 {
498         uint32_t *dmaster = (uint32_t *)p;
499         struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
500         int ret;
501
502         /* skip if already correct */
503         if (header->dmaster == *dmaster) {
504                 return 0;
505         }
506
507         header->dmaster = *dmaster;
508
509         ret = tdb_store(tdb, key, data, TDB_REPLACE);
510         if (ret) {
511                 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back  ret:%d\n",ret));
512                 return ret;
513         }
514
515         /* TODO: add error checking here */
516
517         return 0;
518 }
519
520 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
521 {
522         struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
523         struct ctdb_db_context *ctdb_db;
524
525         ctdb_db = find_ctdb_db(ctdb, p->db_id);
526         if (!ctdb_db) {
527                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
528                 return -1;
529         }
530
531         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
532                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
533                 return -1;
534         }
535
536         if (ctdb_lock_all_databases_mark(ctdb,  ctdb_db->priority) != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
538                 return -1;
539         }
540
541         tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
542
543         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
544         
545         return 0;
546 }
547
548 struct ctdb_set_recmode_state {
549         struct ctdb_context *ctdb;
550         struct ctdb_req_control *c;
551         uint32_t recmode;
552         int fd[2];
553         struct timed_event *te;
554         struct fd_event *fde;
555         pid_t child;
556         struct timeval start_time;
557 };
558
559 /*
560   called if our set_recmode child times out. this would happen if
561   ctdb_recovery_lock() would block.
562  */
563 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te, 
564                                          struct timeval t, void *private_data)
565 {
566         struct ctdb_set_recmode_state *state = talloc_get_type(private_data, 
567                                            struct ctdb_set_recmode_state);
568
569         /* we consider this a success, not a failure, as we failed to
570            set the recovery lock which is what we wanted.  This can be
571            caused by the cluster filesystem being very slow to
572            arbitrate locks immediately after a node failure.       
573          */
574         DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
575         state->ctdb->recovery_mode = state->recmode;
576         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
577         talloc_free(state);
578 }
579
580
581 /* when we free the recmode state we must kill any child process.
582 */
583 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
584 {
585         double l = timeval_elapsed(&state->start_time);
586
587         CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
588
589         if (state->fd[0] != -1) {
590                 state->fd[0] = -1;
591         }
592         if (state->fd[1] != -1) {
593                 state->fd[1] = -1;
594         }
595         kill(state->child, SIGKILL);
596         return 0;
597 }
598
599 /* this is called when the client process has completed ctdb_recovery_lock()
600    and has written data back to us through the pipe.
601 */
602 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde, 
603                              uint16_t flags, void *private_data)
604 {
605         struct ctdb_set_recmode_state *state= talloc_get_type(private_data, 
606                                              struct ctdb_set_recmode_state);
607         char c = 0;
608         int ret;
609
610         /* we got a response from our child process so we can abort the
611            timeout.
612         */
613         talloc_free(state->te);
614         state->te = NULL;
615
616
617         /* read the childs status when trying to lock the reclock file.
618            child wrote 0 if everything is fine and 1 if it did manage
619            to lock the file, which would be a problem since that means
620            we got a request to exit from recovery but we could still lock
621            the file   which at this time SHOULD be locked by the recovery
622            daemon on the recmaster
623         */              
624         ret = read(state->fd[0], &c, 1);
625         if (ret != 1 || c != 0) {
626                 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
627                 talloc_free(state);
628                 return;
629         }
630
631         state->ctdb->recovery_mode = state->recmode;
632
633         /* release any deferred attach calls from clients */
634         if (state->recmode == CTDB_RECOVERY_NORMAL) {
635                 ctdb_process_deferred_attach(state->ctdb);
636         }
637
638         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
639         talloc_free(state);
640         return;
641 }
642
643 static void
644 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te, 
645                                struct timeval t, void *private_data)
646 {
647         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
648
649         DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
650         talloc_free(ctdb->release_ips_ctx);
651         ctdb->release_ips_ctx = NULL;
652
653         ctdb_release_all_ips(ctdb);
654 }
655
656 /*
657  * Set up an event to drop all public ips if we remain in recovery for too
658  * long
659  */
660 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
661 {
662         if (ctdb->release_ips_ctx != NULL) {
663                 talloc_free(ctdb->release_ips_ctx);
664         }
665         ctdb->release_ips_ctx = talloc_new(ctdb);
666         CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
667
668         event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
669         return 0;
670 }
671
672 /*
673   set the recovery mode
674  */
675 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
676                                  struct ctdb_req_control *c,
677                                  TDB_DATA indata, bool *async_reply,
678                                  const char **errormsg)
679 {
680         uint32_t recmode = *(uint32_t *)indata.dptr;
681         int i, ret;
682         struct ctdb_set_recmode_state *state;
683         pid_t parent = getpid();
684
685         /* if we enter recovery but stay in recovery for too long
686            we will eventually drop all our ip addresses
687         */
688         if (recmode == CTDB_RECOVERY_NORMAL) {
689                 talloc_free(ctdb->release_ips_ctx);
690                 ctdb->release_ips_ctx = NULL;
691         } else {
692                 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
693                         DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
694                 }
695         }
696
697         if (recmode != ctdb->recovery_mode) {
698                 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n", 
699                          recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
700         }
701
702         if (recmode != CTDB_RECOVERY_NORMAL ||
703             ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
704                 ctdb->recovery_mode = recmode;
705                 return 0;
706         }
707
708         /* some special handling when ending recovery mode */
709
710         /* force the databases to thaw */
711         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
712                 if (ctdb->freeze_handles[i] != NULL) {
713                         ctdb_control_thaw(ctdb, i);
714                 }
715         }
716
717         state = talloc(ctdb, struct ctdb_set_recmode_state);
718         CTDB_NO_MEMORY(ctdb, state);
719
720         state->start_time = timeval_current();
721         state->fd[0] = -1;
722         state->fd[1] = -1;
723
724         /* release any deferred attach calls from clients */
725         if (recmode == CTDB_RECOVERY_NORMAL) {
726                 ctdb_process_deferred_attach(ctdb);
727         }
728
729         if (ctdb->tunable.verify_recovery_lock == 0) {
730                 /* dont need to verify the reclock file */
731                 ctdb->recovery_mode = recmode;
732                 return 0;
733         }
734
735         /* For the rest of what needs to be done, we need to do this in
736            a child process since 
737            1, the call to ctdb_recovery_lock() can block if the cluster
738               filesystem is in the process of recovery.
739         */
740         ret = pipe(state->fd);
741         if (ret != 0) {
742                 talloc_free(state);
743                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
744                 return -1;
745         }
746
747         state->child = fork();
748         if (state->child == (pid_t)-1) {
749                 close(state->fd[0]);
750                 close(state->fd[1]);
751                 talloc_free(state);
752                 return -1;
753         }
754
755         if (state->child == 0) {
756                 char cc = 0;
757                 close(state->fd[0]);
758
759                 debug_extra = talloc_asprintf(NULL, "set_recmode:");
760                 /* we should not be able to get the lock on the reclock file, 
761                   as it should  be held by the recovery master 
762                 */
763                 if (ctdb_recovery_lock(ctdb, false)) {
764                         DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
765                         cc = 1;
766                 }
767
768                 write(state->fd[1], &cc, 1);
769                 /* make sure we die when our parent dies */
770                 while (kill(parent, 0) == 0 || errno != ESRCH) {
771                         sleep(5);
772                         write(state->fd[1], &cc, 1);
773                 }
774                 _exit(0);
775         }
776         close(state->fd[1]);
777         set_close_on_exec(state->fd[0]);
778
779         state->fd[1] = -1;
780
781         talloc_set_destructor(state, set_recmode_destructor);
782
783         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
784
785         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
786                                     ctdb_set_recmode_timeout, state);
787
788         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
789                                 EVENT_FD_READ,
790                                 set_recmode_handler,
791                                 (void *)state);
792
793         if (state->fde == NULL) {
794                 talloc_free(state);
795                 return -1;
796         }
797         tevent_fd_set_auto_close(state->fde);
798
799         state->ctdb    = ctdb;
800         state->recmode = recmode;
801         state->c       = talloc_steal(state, c);
802
803         *async_reply = true;
804
805         return 0;
806 }
807
808
809 /*
810   try and get the recovery lock in shared storage - should only work
811   on the recovery master recovery daemon. Anywhere else is a bug
812  */
813 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
814 {
815         struct flock lock;
816
817         if (keep) {
818                 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
819         }
820         if (ctdb->recovery_lock_fd != -1) {
821                 close(ctdb->recovery_lock_fd);
822                 ctdb->recovery_lock_fd = -1;
823         }
824
825         ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
826         if (ctdb->recovery_lock_fd == -1) {
827                 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n", 
828                          ctdb->recovery_lock_file, strerror(errno)));
829                 return false;
830         }
831
832         set_close_on_exec(ctdb->recovery_lock_fd);
833
834         lock.l_type = F_WRLCK;
835         lock.l_whence = SEEK_SET;
836         lock.l_start = 0;
837         lock.l_len = 1;
838         lock.l_pid = 0;
839
840         if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
841                 close(ctdb->recovery_lock_fd);
842                 ctdb->recovery_lock_fd = -1;
843                 if (keep) {
844                         DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
845                 }
846                 return false;
847         }
848
849         if (!keep) {
850                 close(ctdb->recovery_lock_fd);
851                 ctdb->recovery_lock_fd = -1;
852         }
853
854         if (keep) {
855                 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
856         }
857
858         DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
859
860         return true;
861 }
862
863 /*
864   delete a record as part of the vacuum process
865   only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
866   use non-blocking locks
867
868   return 0 if the record was successfully deleted (i.e. it does not exist
869   when the function returns)
870   or !0 is the record still exists in the tdb after returning.
871  */
872 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
873 {
874         TDB_DATA key, data;
875         struct ctdb_ltdb_header *hdr, *hdr2;
876         
877         /* these are really internal tdb functions - but we need them here for
878            non-blocking lock of the freelist */
879         int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
880         int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
881
882
883         key.dsize = rec->keylen;
884         key.dptr  = &rec->data[0];
885         data.dsize = rec->datalen;
886         data.dptr = &rec->data[rec->keylen];
887
888         if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
889                 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
890                 return -1;
891         }
892
893         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
894                 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
895                 return -1;
896         }
897
898         hdr = (struct ctdb_ltdb_header *)data.dptr;
899
900         /* use a non-blocking lock */
901         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
902                 return -1;
903         }
904
905         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
906         if (data.dptr == NULL) {
907                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
908                 return 0;
909         }
910
911         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
912                 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
913                         tdb_delete(ctdb_db->ltdb->tdb, key);
914                         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
915                         DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
916                 }
917                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
918                 free(data.dptr);
919                 return 0;
920         }
921         
922         hdr2 = (struct ctdb_ltdb_header *)data.dptr;
923
924         if (hdr2->rsn > hdr->rsn) {
925                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
926                 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
927                          (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
928                 free(data.dptr);
929                 return -1;              
930         }
931
932         if (hdr2->dmaster == ctdb->pnn) {
933                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
934                 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
935                 free(data.dptr);
936                 return -1;                              
937         }
938
939         if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
940                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
941                 free(data.dptr);
942                 return -1;                              
943         }
944
945         if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
946                 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
947                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
948                 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
949                 free(data.dptr);
950                 return -1;                                              
951         }
952
953         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
954         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
955         free(data.dptr);
956         return 0;       
957 }
958
959
960
961 struct recovery_callback_state {
962         struct ctdb_req_control *c;
963 };
964
965
966 /*
967   called when the 'recovered' event script has finished
968  */
969 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
970 {
971         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
972
973         ctdb_enable_monitoring(ctdb);
974         CTDB_INCREMENT_STAT(ctdb, num_recoveries);
975
976         if (status != 0) {
977                 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
978                 if (status == -ETIME) {
979                         ctdb_ban_self(ctdb);
980                 }
981         }
982
983         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
984         talloc_free(state);
985
986         gettimeofday(&ctdb->last_recovery_finished, NULL);
987 }
988
989 /*
990   recovery has finished
991  */
992 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb, 
993                                 struct ctdb_req_control *c,
994                                 bool *async_reply)
995 {
996         int ret;
997         struct recovery_callback_state *state;
998
999         DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1000
1001         ctdb_persistent_finish_trans3_commits(ctdb);
1002
1003         state = talloc(ctdb, struct recovery_callback_state);
1004         CTDB_NO_MEMORY(ctdb, state);
1005
1006         state->c    = c;
1007
1008         ctdb_disable_monitoring(ctdb);
1009
1010         ret = ctdb_event_script_callback(ctdb, state,
1011                                          ctdb_end_recovery_callback, 
1012                                          state, 
1013                                          false,
1014                                          CTDB_EVENT_RECOVERED, "%s", "");
1015
1016         if (ret != 0) {
1017                 ctdb_enable_monitoring(ctdb);
1018
1019                 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1020                 talloc_free(state);
1021                 return -1;
1022         }
1023
1024         /* tell the control that we will be reply asynchronously */
1025         state->c    = talloc_steal(state, c);
1026         *async_reply = true;
1027         return 0;
1028 }
1029
1030 /*
1031   called when the 'startrecovery' event script has finished
1032  */
1033 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1034 {
1035         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1036
1037         if (status != 0) {
1038                 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1039         }
1040
1041         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1042         talloc_free(state);
1043 }
1044
1045 /*
1046   run the startrecovery eventscript
1047  */
1048 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb, 
1049                                 struct ctdb_req_control *c,
1050                                 bool *async_reply)
1051 {
1052         int ret;
1053         struct recovery_callback_state *state;
1054
1055         DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1056         gettimeofday(&ctdb->last_recovery_started, NULL);
1057
1058         state = talloc(ctdb, struct recovery_callback_state);
1059         CTDB_NO_MEMORY(ctdb, state);
1060
1061         state->c    = talloc_steal(state, c);
1062
1063         ctdb_disable_monitoring(ctdb);
1064
1065         ret = ctdb_event_script_callback(ctdb, state,
1066                                          ctdb_start_recovery_callback, 
1067                                          state, false,
1068                                          CTDB_EVENT_START_RECOVERY,
1069                                          "%s", "");
1070
1071         if (ret != 0) {
1072                 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1073                 talloc_free(state);
1074                 return -1;
1075         }
1076
1077         /* tell the control that we will be reply asynchronously */
1078         *async_reply = true;
1079         return 0;
1080 }
1081
1082 /*
1083  try to delete all these records as part of the vacuuming process
1084  and return the records we failed to delete
1085 */
1086 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1087 {
1088         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1089         struct ctdb_db_context *ctdb_db;
1090         int i;
1091         struct ctdb_rec_data *rec;
1092         struct ctdb_marshall_buffer *records;
1093
1094         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1095                 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1096                 return -1;
1097         }
1098
1099         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1100         if (!ctdb_db) {
1101                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1102                 return -1;
1103         }
1104
1105
1106         DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1107                  reply->count, reply->db_id));
1108
1109
1110         /* create a blob to send back the records we couldnt delete */  
1111         records = (struct ctdb_marshall_buffer *)
1112                         talloc_zero_size(outdata, 
1113                                     offsetof(struct ctdb_marshall_buffer, data));
1114         if (records == NULL) {
1115                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1116                 return -1;
1117         }
1118         records->db_id = ctdb_db->db_id;
1119
1120
1121         rec = (struct ctdb_rec_data *)&reply->data[0];
1122         for (i=0;i<reply->count;i++) {
1123                 TDB_DATA key, data;
1124
1125                 key.dptr = &rec->data[0];
1126                 key.dsize = rec->keylen;
1127                 data.dptr = &rec->data[key.dsize];
1128                 data.dsize = rec->datalen;
1129
1130                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1131                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1132                         return -1;
1133                 }
1134
1135                 /* If we cant delete the record we must add it to the reply
1136                    so the lmaster knows it may not purge this record
1137                 */
1138                 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1139                         size_t old_size;
1140                         struct ctdb_ltdb_header *hdr;
1141
1142                         hdr = (struct ctdb_ltdb_header *)data.dptr;
1143                         data.dptr += sizeof(*hdr);
1144                         data.dsize -= sizeof(*hdr);
1145
1146                         DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1147
1148                         old_size = talloc_get_size(records);
1149                         records = talloc_realloc_size(outdata, records, old_size + rec->length);
1150                         if (records == NULL) {
1151                                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1152                                 return -1;
1153                         }
1154                         records->count++;
1155                         memcpy(old_size+(uint8_t *)records, rec, rec->length);
1156                 } 
1157
1158                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1159         }           
1160
1161
1162         outdata->dptr = (uint8_t *)records;
1163         outdata->dsize = talloc_get_size(records);
1164
1165         return 0;
1166 }
1167
1168 /*
1169   report capabilities
1170  */
1171 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1172 {
1173         uint32_t *capabilities = NULL;
1174
1175         capabilities = talloc(outdata, uint32_t);
1176         CTDB_NO_MEMORY(ctdb, capabilities);
1177         *capabilities = ctdb->capabilities;
1178
1179         outdata->dsize = sizeof(uint32_t);
1180         outdata->dptr = (uint8_t *)capabilities;
1181
1182         return 0;       
1183 }
1184
1185 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1186 {
1187         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1188         uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1189
1190         DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1191
1192         if (*count < ctdb->tunable.recd_ping_failcount) {
1193                 (*count)++;
1194                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1195                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1196                         ctdb_recd_ping_timeout, ctdb);
1197                 return;
1198         }
1199
1200         DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1201
1202         ctdb_stop_recoverd(ctdb);
1203         ctdb_start_recoverd(ctdb);
1204 }
1205
1206 /* The recovery daemon will ping us at regular intervals.
1207    If we havent been pinged for a while we assume the recovery
1208    daemon is inoperable and we shut down.
1209 */
1210 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1211 {
1212         talloc_free(ctdb->recd_ping_count);
1213
1214         ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1215         CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1216
1217         if (ctdb->tunable.recd_ping_timeout != 0) {
1218                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1219                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1220                         ctdb_recd_ping_timeout, ctdb);
1221         }
1222
1223         return 0;
1224 }
1225
1226
1227
1228 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1229 {
1230         CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1231
1232         ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
1233         return 0;
1234 }
1235
1236
1237 struct stop_node_callback_state {
1238         struct ctdb_req_control *c;
1239 };
1240
1241 /*
1242   called when the 'stopped' event script has finished
1243  */
1244 static void ctdb_stop_node_callback(struct ctdb_context *ctdb, int status, void *p)
1245 {
1246         struct stop_node_callback_state *state = talloc_get_type(p, struct stop_node_callback_state);
1247
1248         if (status != 0) {
1249                 DEBUG(DEBUG_ERR,(__location__ " stopped event script failed (status %d)\n", status));
1250                 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1251                 if (status == -ETIME) {
1252                         ctdb_ban_self(ctdb);
1253                 }
1254         }
1255
1256         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1257         talloc_free(state);
1258 }
1259
1260 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply)
1261 {
1262         int ret;
1263         struct stop_node_callback_state *state;
1264
1265         DEBUG(DEBUG_INFO,(__location__ " Stopping node\n"));
1266
1267         state = talloc(ctdb, struct stop_node_callback_state);
1268         CTDB_NO_MEMORY(ctdb, state);
1269
1270         state->c    = talloc_steal(state, c);
1271
1272         ctdb_disable_monitoring(ctdb);
1273
1274         ret = ctdb_event_script_callback(ctdb, state,
1275                                          ctdb_stop_node_callback, 
1276                                          state, false,
1277                                          CTDB_EVENT_STOPPED, "%s", "");
1278
1279         if (ret != 0) {
1280                 ctdb_enable_monitoring(ctdb);
1281
1282                 DEBUG(DEBUG_ERR,(__location__ " Failed to stop node\n"));
1283                 talloc_free(state);
1284                 return -1;
1285         }
1286
1287         ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1288
1289         *async_reply = true;
1290
1291         return 0;
1292 }
1293
1294 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1295 {
1296         DEBUG(DEBUG_INFO,(__location__ " Continue node\n"));
1297         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1298
1299         return 0;
1300 }