Add a new test tool that fetch locks a record and then blocks until it receives
[sahlberg/ctdb.git] / server / ctdb_recover.c
1 /* 
2    ctdb recovery code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
29 #include "db_wrap.h"
30
31 /*
32   lock all databases - mark only
33  */
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb, uint32_t priority)
35 {
36         struct ctdb_db_context *ctdb_db;
37
38         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
39                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
40                 return -1;
41         }
42
43         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
44                 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
45                 return -1;
46         }
47         /* The dual loop is a woraround for older versions of samba
48            that does not yet support the set-db-priority/lock order
49            call. So that we get basic deadlock avoiidance also for
50            these old versions of samba.
51            This code will be removed in the future.
52         */
53         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
54                 if (ctdb_db->priority != priority) {
55                         continue;
56                 }
57                 if (strstr(ctdb_db->db_name, "notify") != NULL) {
58                         continue;
59                 }
60                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
61                         return -1;
62                 }
63                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
64                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
65                         return -1;
66                 }
67         }
68         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
69                 if (ctdb_db->priority != priority) {
70                         continue;
71                 }
72                 if (strstr(ctdb_db->db_name, "notify") == NULL) {
73                         continue;
74                 }
75                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
76                         return -1;
77                 }
78                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
79                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
80                         return -1;
81                 }
82         }
83         return 0;
84 }
85
86 /*
87   lock all databases - unmark only
88  */
89 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb, uint32_t priority)
90 {
91         struct ctdb_db_context *ctdb_db;
92
93         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
94                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
95                 return -1;
96         }
97
98         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
99                 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
100                 return -1;
101         }
102         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
103                 if (ctdb_db->priority != priority) {
104                         continue;
105                 }
106                 tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
107                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
108                         return -1;
109                 }
110         }
111         return 0;
112 }
113
114
115 int 
116 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
117 {
118         CHECK_CONTROL_DATA_SIZE(0);
119         struct ctdb_vnn_map_wire *map;
120         size_t len;
121
122         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
123         map = talloc_size(outdata, len);
124         CTDB_NO_MEMORY(ctdb, map);
125
126         map->generation = ctdb->vnn_map->generation;
127         map->size = ctdb->vnn_map->size;
128         memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
129
130         outdata->dsize = len;
131         outdata->dptr  = (uint8_t *)map;
132
133         return 0;
134 }
135
136 int 
137 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
138 {
139         struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
140         int i;
141
142         for(i=1; i<=NUM_DB_PRIORITIES; i++) {
143                 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
144                         DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
145                         return -1;
146                 }
147         }
148
149         talloc_free(ctdb->vnn_map);
150
151         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
152         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
153
154         ctdb->vnn_map->generation = map->generation;
155         ctdb->vnn_map->size       = map->size;
156         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
157         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
158
159         memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
160
161         return 0;
162 }
163
164 int 
165 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
166 {
167         uint32_t i, len;
168         struct ctdb_db_context *ctdb_db;
169         struct ctdb_dbid_map *dbid_map;
170
171         CHECK_CONTROL_DATA_SIZE(0);
172
173         len = 0;
174         for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
175                 len++;
176         }
177
178
179         outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
180         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
181         if (!outdata->dptr) {
182                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
183                 exit(1);
184         }
185
186         dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
187         dbid_map->num = len;
188         for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
189                 dbid_map->dbs[i].dbid       = ctdb_db->db_id;
190                 dbid_map->dbs[i].persistent = ctdb_db->persistent;
191         }
192
193         return 0;
194 }
195
196 int 
197 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
198 {
199         uint32_t i, num_nodes;
200         struct ctdb_node_map *node_map;
201
202         CHECK_CONTROL_DATA_SIZE(0);
203
204         num_nodes = ctdb->num_nodes;
205
206         outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
207         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
208         if (!outdata->dptr) {
209                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
210                 exit(1);
211         }
212
213         node_map = (struct ctdb_node_map *)outdata->dptr;
214         node_map->num = num_nodes;
215         for (i=0; i<num_nodes; i++) {
216                 if (parse_ip(ctdb->nodes[i]->address.address,
217                              NULL, /* TODO: pass in the correct interface here*/
218                              0,
219                              &node_map->nodes[i].addr) == 0)
220                 {
221                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
222                 }
223
224                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
225                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
226         }
227
228         return 0;
229 }
230
231 /*
232    get an old style ipv4-only nodemap
233 */
234 int 
235 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
236 {
237         uint32_t i, num_nodes;
238         struct ctdb_node_mapv4 *node_map;
239
240         CHECK_CONTROL_DATA_SIZE(0);
241
242         num_nodes = ctdb->num_nodes;
243
244         outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
245         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
246         if (!outdata->dptr) {
247                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
248                 exit(1);
249         }
250
251         node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
252         node_map->num = num_nodes;
253         for (i=0; i<num_nodes; i++) {
254                 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
255                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
256                         return -1;
257                 }
258
259                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
260                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
261         }
262
263         return 0;
264 }
265
266 static void
267 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te, 
268                                struct timeval t, void *private_data)
269 {
270         int i, num_nodes;
271         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
272         TALLOC_CTX *tmp_ctx;
273         struct ctdb_node **nodes;       
274
275         tmp_ctx = talloc_new(ctdb);
276
277         /* steal the old nodes file for a while */
278         talloc_steal(tmp_ctx, ctdb->nodes);
279         nodes = ctdb->nodes;
280         ctdb->nodes = NULL;
281         num_nodes = ctdb->num_nodes;
282         ctdb->num_nodes = 0;
283
284         /* load the new nodes file */
285         ctdb_load_nodes_file(ctdb);
286
287         for (i=0; i<ctdb->num_nodes; i++) {
288                 /* keep any identical pre-existing nodes and connections */
289                 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
290                         talloc_free(ctdb->nodes[i]);
291                         ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
292                         continue;
293                 }
294
295                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
296                         continue;
297                 }
298
299                 /* any new or different nodes must be added */
300                 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
301                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
302                         ctdb_fatal(ctdb, "failed to add node. shutting down\n");
303                 }
304                 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
305                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
306                         ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
307                 }
308         }
309
310         /* tell the recovery daemon to reaload the nodes file too */
311         ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
312
313         talloc_free(tmp_ctx);
314         return;
315 }
316
317 /*
318   reload the nodes file after a short delay (so that we can send the response
319   back first
320 */
321 int 
322 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
323 {
324         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
325
326         return 0;
327 }
328
329 /* 
330    a traverse function for pulling all relevent records from pulldb
331  */
332 struct pulldb_data {
333         struct ctdb_context *ctdb;
334         struct ctdb_marshall_buffer *pulldata;
335         uint32_t len;
336         bool failed;
337 };
338
339 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
340 {
341         struct pulldb_data *params = (struct pulldb_data *)p;
342         struct ctdb_rec_data *rec;
343
344         /* add the record to the blob */
345         rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
346         if (rec == NULL) {
347                 params->failed = true;
348                 return -1;
349         }
350         params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
351         if (params->pulldata == NULL) {
352                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
353                 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
354         }
355         params->pulldata->count++;
356         memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
357         params->len += rec->length;
358         talloc_free(rec);
359
360         return 0;
361 }
362
363 /*
364   pul a bunch of records from a ltdb, filtering by lmaster
365  */
366 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
367 {
368         struct ctdb_control_pulldb *pull;
369         struct ctdb_db_context *ctdb_db;
370         struct pulldb_data params;
371         struct ctdb_marshall_buffer *reply;
372
373         pull = (struct ctdb_control_pulldb *)indata.dptr;
374         
375         ctdb_db = find_ctdb_db(ctdb, pull->db_id);
376         if (!ctdb_db) {
377                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
378                 return -1;
379         }
380
381         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
382                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
383                 return -1;
384         }
385
386         reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
387         CTDB_NO_MEMORY(ctdb, reply);
388
389         reply->db_id = pull->db_id;
390
391         params.ctdb = ctdb;
392         params.pulldata = reply;
393         params.len = offsetof(struct ctdb_marshall_buffer, data);
394         params.failed = false;
395
396         if (ctdb_db->unhealthy_reason) {
397                 /* this is just a warning, as the tdb should be empty anyway */
398                 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
399                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
400         }
401
402         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
404                 return -1;
405         }
406
407         if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
408                 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
409                 ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
410                 talloc_free(params.pulldata);
411                 return -1;
412         }
413
414         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
415
416         outdata->dptr = (uint8_t *)params.pulldata;
417         outdata->dsize = params.len;
418
419         return 0;
420 }
421
422 /*
423   push a bunch of records into a ltdb, filtering by rsn
424  */
425 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
426 {
427         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
428         struct ctdb_db_context *ctdb_db;
429         int i, ret;
430         struct ctdb_rec_data *rec;
431
432         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
433                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
434                 return -1;
435         }
436
437         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
438         if (!ctdb_db) {
439                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
440                 return -1;
441         }
442
443         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
444                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
445                 return -1;
446         }
447
448         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
449                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
450                 return -1;
451         }
452
453         rec = (struct ctdb_rec_data *)&reply->data[0];
454
455         DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
456                  reply->count, reply->db_id));
457
458         for (i=0;i<reply->count;i++) {
459                 TDB_DATA key, data;
460                 struct ctdb_ltdb_header *hdr;
461
462                 key.dptr = &rec->data[0];
463                 key.dsize = rec->keylen;
464                 data.dptr = &rec->data[key.dsize];
465                 data.dsize = rec->datalen;
466
467                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
468                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
469                         goto failed;
470                 }
471                 hdr = (struct ctdb_ltdb_header *)data.dptr;
472                 data.dptr += sizeof(*hdr);
473                 data.dsize -= sizeof(*hdr);
474
475                 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
476                 if (ret != 0) {
477                         DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
478                         goto failed;
479                 }
480
481                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
482         }           
483
484         DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
485                  reply->count, reply->db_id));
486
487         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
488         return 0;
489
490 failed:
491         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
492         return -1;
493 }
494
495
496 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
497 {
498         uint32_t *dmaster = (uint32_t *)p;
499         struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
500         int ret;
501
502         /* skip if already correct */
503         if (header->dmaster == *dmaster) {
504                 return 0;
505         }
506
507         header->dmaster = *dmaster;
508
509         ret = tdb_store(tdb, key, data, TDB_REPLACE);
510         if (ret) {
511                 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back  ret:%d\n",ret));
512                 return ret;
513         }
514
515         /* TODO: add error checking here */
516
517         return 0;
518 }
519
520 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
521 {
522         struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
523         struct ctdb_db_context *ctdb_db;
524
525         ctdb_db = find_ctdb_db(ctdb, p->db_id);
526         if (!ctdb_db) {
527                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
528                 return -1;
529         }
530
531         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
532                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
533                 return -1;
534         }
535
536         if (ctdb_lock_all_databases_mark(ctdb,  ctdb_db->priority) != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
538                 return -1;
539         }
540
541         tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
542
543         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
544         
545         return 0;
546 }
547
548 struct ctdb_set_recmode_state {
549         struct ctdb_context *ctdb;
550         struct ctdb_req_control *c;
551         uint32_t recmode;
552         int fd[2];
553         struct timed_event *te;
554         struct fd_event *fde;
555         pid_t child;
556         struct timeval start_time;
557 };
558
559 /*
560   called if our set_recmode child times out. this would happen if
561   ctdb_recovery_lock() would block.
562  */
563 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te, 
564                                          struct timeval t, void *private_data)
565 {
566         struct ctdb_set_recmode_state *state = talloc_get_type(private_data, 
567                                            struct ctdb_set_recmode_state);
568
569         /* we consider this a success, not a failure, as we failed to
570            set the recovery lock which is what we wanted.  This can be
571            caused by the cluster filesystem being very slow to
572            arbitrate locks immediately after a node failure.       
573          */
574         DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
575         state->ctdb->recovery_mode = state->recmode;
576         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
577         talloc_free(state);
578 }
579
580
581 /* when we free the recmode state we must kill any child process.
582 */
583 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
584 {
585         double l = timeval_elapsed(&state->start_time);
586
587         CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
588
589         if (state->fd[0] != -1) {
590                 state->fd[0] = -1;
591         }
592         if (state->fd[1] != -1) {
593                 state->fd[1] = -1;
594         }
595         kill(state->child, SIGKILL);
596         return 0;
597 }
598
599 /* this is called when the client process has completed ctdb_recovery_lock()
600    and has written data back to us through the pipe.
601 */
602 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde, 
603                              uint16_t flags, void *private_data)
604 {
605         struct ctdb_set_recmode_state *state= talloc_get_type(private_data, 
606                                              struct ctdb_set_recmode_state);
607         char c = 0;
608         int ret;
609
610         /* we got a response from our child process so we can abort the
611            timeout.
612         */
613         talloc_free(state->te);
614         state->te = NULL;
615
616
617         /* read the childs status when trying to lock the reclock file.
618            child wrote 0 if everything is fine and 1 if it did manage
619            to lock the file, which would be a problem since that means
620            we got a request to exit from recovery but we could still lock
621            the file   which at this time SHOULD be locked by the recovery
622            daemon on the recmaster
623         */              
624         ret = read(state->fd[0], &c, 1);
625         if (ret != 1 || c != 0) {
626                 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
627                 talloc_free(state);
628                 return;
629         }
630
631         state->ctdb->recovery_mode = state->recmode;
632
633         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
634         talloc_free(state);
635         return;
636 }
637
638 static void
639 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te, 
640                                struct timeval t, void *private_data)
641 {
642         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
643
644         DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
645         talloc_free(ctdb->release_ips_ctx);
646         ctdb->release_ips_ctx = NULL;
647
648         ctdb_release_all_ips(ctdb);
649 }
650
651 /*
652  * Set up an event to drop all public ips if we remain in recovery for too
653  * long
654  */
655 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
656 {
657         if (ctdb->release_ips_ctx != NULL) {
658                 talloc_free(ctdb->release_ips_ctx);
659         }
660         ctdb->release_ips_ctx = talloc_new(ctdb);
661         CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
662
663         event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
664         return 0;
665 }
666
667 /*
668   set the recovery mode
669  */
670 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
671                                  struct ctdb_req_control *c,
672                                  TDB_DATA indata, bool *async_reply,
673                                  const char **errormsg)
674 {
675         uint32_t recmode = *(uint32_t *)indata.dptr;
676         int i, ret;
677         struct ctdb_set_recmode_state *state;
678         pid_t parent = getpid();
679
680         /* if we enter recovery but stay in recovery for too long
681            we will eventually drop all our ip addresses
682         */
683         if (recmode == CTDB_RECOVERY_NORMAL) {
684                 talloc_free(ctdb->release_ips_ctx);
685                 ctdb->release_ips_ctx = NULL;
686         } else {
687                 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
688                         DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
689                 }
690         }
691
692         if (recmode != ctdb->recovery_mode) {
693                 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n", 
694                          recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
695         }
696
697         if (recmode != CTDB_RECOVERY_NORMAL ||
698             ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
699                 ctdb->recovery_mode = recmode;
700                 return 0;
701         }
702
703         /* some special handling when ending recovery mode */
704
705         /* force the databases to thaw */
706         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
707                 if (ctdb->freeze_handles[i] != NULL) {
708                         ctdb_control_thaw(ctdb, i);
709                 }
710         }
711
712         state = talloc(ctdb, struct ctdb_set_recmode_state);
713         CTDB_NO_MEMORY(ctdb, state);
714
715         state->start_time = timeval_current();
716         state->fd[0] = -1;
717         state->fd[1] = -1;
718
719         if (ctdb->tunable.verify_recovery_lock == 0) {
720                 /* dont need to verify the reclock file */
721                 ctdb->recovery_mode = recmode;
722                 return 0;
723         }
724
725         /* For the rest of what needs to be done, we need to do this in
726            a child process since 
727            1, the call to ctdb_recovery_lock() can block if the cluster
728               filesystem is in the process of recovery.
729         */
730         ret = pipe(state->fd);
731         if (ret != 0) {
732                 talloc_free(state);
733                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
734                 return -1;
735         }
736
737         state->child = fork();
738         if (state->child == (pid_t)-1) {
739                 close(state->fd[0]);
740                 close(state->fd[1]);
741                 talloc_free(state);
742                 return -1;
743         }
744
745         if (state->child == 0) {
746                 char cc = 0;
747                 close(state->fd[0]);
748
749                 debug_extra = talloc_asprintf(NULL, "set_recmode:");
750                 /* we should not be able to get the lock on the reclock file, 
751                   as it should  be held by the recovery master 
752                 */
753                 if (ctdb_recovery_lock(ctdb, false)) {
754                         DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
755                         cc = 1;
756                 }
757
758                 write(state->fd[1], &cc, 1);
759                 /* make sure we die when our parent dies */
760                 while (kill(parent, 0) == 0 || errno != ESRCH) {
761                         sleep(5);
762                         write(state->fd[1], &cc, 1);
763                 }
764                 _exit(0);
765         }
766         close(state->fd[1]);
767         set_close_on_exec(state->fd[0]);
768
769         state->fd[1] = -1;
770
771         talloc_set_destructor(state, set_recmode_destructor);
772
773         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
774
775         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
776                                     ctdb_set_recmode_timeout, state);
777
778         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
779                                 EVENT_FD_READ,
780                                 set_recmode_handler,
781                                 (void *)state);
782
783         if (state->fde == NULL) {
784                 talloc_free(state);
785                 return -1;
786         }
787         tevent_fd_set_auto_close(state->fde);
788
789         state->ctdb    = ctdb;
790         state->recmode = recmode;
791         state->c       = talloc_steal(state, c);
792
793         *async_reply = true;
794
795         return 0;
796 }
797
798
799 /*
800   try and get the recovery lock in shared storage - should only work
801   on the recovery master recovery daemon. Anywhere else is a bug
802  */
803 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
804 {
805         struct flock lock;
806
807         if (keep) {
808                 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
809         }
810         if (ctdb->recovery_lock_fd != -1) {
811                 close(ctdb->recovery_lock_fd);
812                 ctdb->recovery_lock_fd = -1;
813         }
814
815         ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
816         if (ctdb->recovery_lock_fd == -1) {
817                 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n", 
818                          ctdb->recovery_lock_file, strerror(errno)));
819                 return false;
820         }
821
822         set_close_on_exec(ctdb->recovery_lock_fd);
823
824         lock.l_type = F_WRLCK;
825         lock.l_whence = SEEK_SET;
826         lock.l_start = 0;
827         lock.l_len = 1;
828         lock.l_pid = 0;
829
830         if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
831                 close(ctdb->recovery_lock_fd);
832                 ctdb->recovery_lock_fd = -1;
833                 if (keep) {
834                         DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
835                 }
836                 return false;
837         }
838
839         if (!keep) {
840                 close(ctdb->recovery_lock_fd);
841                 ctdb->recovery_lock_fd = -1;
842         }
843
844         if (keep) {
845                 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
846         }
847
848         DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
849
850         return true;
851 }
852
853 /*
854   delete a record as part of the vacuum process
855   only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
856   use non-blocking locks
857
858   return 0 if the record was successfully deleted (i.e. it does not exist
859   when the function returns)
860   or !0 is the record still exists in the tdb after returning.
861  */
862 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
863 {
864         TDB_DATA key, data;
865         struct ctdb_ltdb_header *hdr, *hdr2;
866         
867         /* these are really internal tdb functions - but we need them here for
868            non-blocking lock of the freelist */
869         int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
870         int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
871
872
873         key.dsize = rec->keylen;
874         key.dptr  = &rec->data[0];
875         data.dsize = rec->datalen;
876         data.dptr = &rec->data[rec->keylen];
877
878         if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
879                 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
880                 return -1;
881         }
882
883         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
884                 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
885                 return -1;
886         }
887
888         hdr = (struct ctdb_ltdb_header *)data.dptr;
889
890         /* use a non-blocking lock */
891         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
892                 return -1;
893         }
894
895         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
896         if (data.dptr == NULL) {
897                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
898                 return 0;
899         }
900
901         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
902                 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
903                         tdb_delete(ctdb_db->ltdb->tdb, key);
904                         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
905                         DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
906                 }
907                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
908                 free(data.dptr);
909                 return 0;
910         }
911         
912         hdr2 = (struct ctdb_ltdb_header *)data.dptr;
913
914         if (hdr2->rsn > hdr->rsn) {
915                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
916                 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
917                          (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
918                 free(data.dptr);
919                 return -1;              
920         }
921
922         if (hdr2->dmaster == ctdb->pnn) {
923                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
924                 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
925                 free(data.dptr);
926                 return -1;                              
927         }
928
929         if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
930                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
931                 free(data.dptr);
932                 return -1;                              
933         }
934
935         if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
936                 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
937                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
938                 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
939                 free(data.dptr);
940                 return -1;                                              
941         }
942
943         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
944         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
945         free(data.dptr);
946         return 0;       
947 }
948
949
950
951 struct recovery_callback_state {
952         struct ctdb_req_control *c;
953 };
954
955
956 /*
957   called when the 'recovered' event script has finished
958  */
959 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
960 {
961         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
962
963         ctdb_enable_monitoring(ctdb);
964         CTDB_INCREMENT_STAT(ctdb, num_recoveries);
965
966         if (status != 0) {
967                 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
968                 if (status == -ETIME) {
969                         ctdb_ban_self(ctdb);
970                 }
971         }
972
973         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
974         talloc_free(state);
975
976         gettimeofday(&ctdb->last_recovery_finished, NULL);
977 }
978
979 /*
980   recovery has finished
981  */
982 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb, 
983                                 struct ctdb_req_control *c,
984                                 bool *async_reply)
985 {
986         int ret;
987         struct recovery_callback_state *state;
988
989         DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
990
991         state = talloc(ctdb, struct recovery_callback_state);
992         CTDB_NO_MEMORY(ctdb, state);
993
994         state->c    = c;
995
996         ctdb_disable_monitoring(ctdb);
997
998         ret = ctdb_event_script_callback(ctdb, state,
999                                          ctdb_end_recovery_callback, 
1000                                          state, 
1001                                          false,
1002                                          CTDB_EVENT_RECOVERED, "%s", "");
1003
1004         if (ret != 0) {
1005                 ctdb_enable_monitoring(ctdb);
1006
1007                 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1008                 talloc_free(state);
1009                 return -1;
1010         }
1011
1012         /* tell the control that we will be reply asynchronously */
1013         state->c    = talloc_steal(state, c);
1014         *async_reply = true;
1015         return 0;
1016 }
1017
1018 /*
1019   called when the 'startrecovery' event script has finished
1020  */
1021 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1022 {
1023         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1024
1025         if (status != 0) {
1026                 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1027         }
1028
1029         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1030         talloc_free(state);
1031 }
1032
1033 /*
1034   run the startrecovery eventscript
1035  */
1036 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb, 
1037                                 struct ctdb_req_control *c,
1038                                 bool *async_reply)
1039 {
1040         int ret;
1041         struct recovery_callback_state *state;
1042
1043         DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1044         gettimeofday(&ctdb->last_recovery_started, NULL);
1045
1046         state = talloc(ctdb, struct recovery_callback_state);
1047         CTDB_NO_MEMORY(ctdb, state);
1048
1049         state->c    = talloc_steal(state, c);
1050
1051         ctdb_disable_monitoring(ctdb);
1052
1053         ret = ctdb_event_script_callback(ctdb, state,
1054                                          ctdb_start_recovery_callback, 
1055                                          state, false,
1056                                          CTDB_EVENT_START_RECOVERY,
1057                                          "%s", "");
1058
1059         if (ret != 0) {
1060                 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1061                 talloc_free(state);
1062                 return -1;
1063         }
1064
1065         /* tell the control that we will be reply asynchronously */
1066         *async_reply = true;
1067         return 0;
1068 }
1069
1070 /*
1071  try to delete all these records as part of the vacuuming process
1072  and return the records we failed to delete
1073 */
1074 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1075 {
1076         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1077         struct ctdb_db_context *ctdb_db;
1078         int i;
1079         struct ctdb_rec_data *rec;
1080         struct ctdb_marshall_buffer *records;
1081
1082         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1083                 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1084                 return -1;
1085         }
1086
1087         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1088         if (!ctdb_db) {
1089                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1090                 return -1;
1091         }
1092
1093
1094         DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1095                  reply->count, reply->db_id));
1096
1097
1098         /* create a blob to send back the records we couldnt delete */  
1099         records = (struct ctdb_marshall_buffer *)
1100                         talloc_zero_size(outdata, 
1101                                     offsetof(struct ctdb_marshall_buffer, data));
1102         if (records == NULL) {
1103                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1104                 return -1;
1105         }
1106         records->db_id = ctdb_db->db_id;
1107
1108
1109         rec = (struct ctdb_rec_data *)&reply->data[0];
1110         for (i=0;i<reply->count;i++) {
1111                 TDB_DATA key, data;
1112
1113                 key.dptr = &rec->data[0];
1114                 key.dsize = rec->keylen;
1115                 data.dptr = &rec->data[key.dsize];
1116                 data.dsize = rec->datalen;
1117
1118                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1119                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1120                         return -1;
1121                 }
1122
1123                 /* If we cant delete the record we must add it to the reply
1124                    so the lmaster knows it may not purge this record
1125                 */
1126                 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1127                         size_t old_size;
1128                         struct ctdb_ltdb_header *hdr;
1129
1130                         hdr = (struct ctdb_ltdb_header *)data.dptr;
1131                         data.dptr += sizeof(*hdr);
1132                         data.dsize -= sizeof(*hdr);
1133
1134                         DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1135
1136                         old_size = talloc_get_size(records);
1137                         records = talloc_realloc_size(outdata, records, old_size + rec->length);
1138                         if (records == NULL) {
1139                                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1140                                 return -1;
1141                         }
1142                         records->count++;
1143                         memcpy(old_size+(uint8_t *)records, rec, rec->length);
1144                 } 
1145
1146                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1147         }           
1148
1149
1150         outdata->dptr = (uint8_t *)records;
1151         outdata->dsize = talloc_get_size(records);
1152
1153         return 0;
1154 }
1155
1156 /*
1157   report capabilities
1158  */
1159 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1160 {
1161         uint32_t *capabilities = NULL;
1162
1163         capabilities = talloc(outdata, uint32_t);
1164         CTDB_NO_MEMORY(ctdb, capabilities);
1165         *capabilities = ctdb->capabilities;
1166
1167         outdata->dsize = sizeof(uint32_t);
1168         outdata->dptr = (uint8_t *)capabilities;
1169
1170         return 0;       
1171 }
1172
1173 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1174 {
1175         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1176         uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1177
1178         DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1179
1180         if (*count < ctdb->tunable.recd_ping_failcount) {
1181                 (*count)++;
1182                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1183                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1184                         ctdb_recd_ping_timeout, ctdb);
1185                 return;
1186         }
1187
1188         DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Shutting down ctdb daemon. (This can be caused if the cluster filesystem has hung)\n"));
1189
1190         ctdb_stop_recoverd(ctdb);
1191         ctdb_stop_keepalive(ctdb);
1192         ctdb_stop_monitoring(ctdb);
1193         ctdb_release_all_ips(ctdb);
1194         if (ctdb->methods != NULL) {
1195                 ctdb->methods->shutdown(ctdb);
1196         }
1197         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1198         DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Daemon has been shut down.\n"));
1199         exit(0);
1200 }
1201
1202 /* The recovery daemon will ping us at regular intervals.
1203    If we havent been pinged for a while we assume the recovery
1204    daemon is inoperable and we shut down.
1205 */
1206 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1207 {
1208         talloc_free(ctdb->recd_ping_count);
1209
1210         ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1211         CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1212
1213         if (ctdb->tunable.recd_ping_timeout != 0) {
1214                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1215                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1216                         ctdb_recd_ping_timeout, ctdb);
1217         }
1218
1219         return 0;
1220 }
1221
1222
1223
1224 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1225 {
1226         CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1227
1228         ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
1229         return 0;
1230 }
1231
1232
1233 struct stop_node_callback_state {
1234         struct ctdb_req_control *c;
1235 };
1236
1237 /*
1238   called when the 'stopped' event script has finished
1239  */
1240 static void ctdb_stop_node_callback(struct ctdb_context *ctdb, int status, void *p)
1241 {
1242         struct stop_node_callback_state *state = talloc_get_type(p, struct stop_node_callback_state);
1243
1244         if (status != 0) {
1245                 DEBUG(DEBUG_ERR,(__location__ " stopped event script failed (status %d)\n", status));
1246                 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1247                 if (status == -ETIME) {
1248                         ctdb_ban_self(ctdb);
1249                 }
1250         }
1251
1252         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1253         talloc_free(state);
1254 }
1255
1256 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply)
1257 {
1258         int ret;
1259         struct stop_node_callback_state *state;
1260
1261         DEBUG(DEBUG_INFO,(__location__ " Stopping node\n"));
1262
1263         state = talloc(ctdb, struct stop_node_callback_state);
1264         CTDB_NO_MEMORY(ctdb, state);
1265
1266         state->c    = talloc_steal(state, c);
1267
1268         ctdb_disable_monitoring(ctdb);
1269
1270         ret = ctdb_event_script_callback(ctdb, state,
1271                                          ctdb_stop_node_callback, 
1272                                          state, false,
1273                                          CTDB_EVENT_STOPPED, "%s", "");
1274
1275         if (ret != 0) {
1276                 ctdb_enable_monitoring(ctdb);
1277
1278                 DEBUG(DEBUG_ERR,(__location__ " Failed to stop node\n"));
1279                 talloc_free(state);
1280                 return -1;
1281         }
1282
1283         ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1284
1285         *async_reply = true;
1286
1287         return 0;
1288 }
1289
1290 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1291 {
1292         DEBUG(DEBUG_INFO,(__location__ " Continue node\n"));
1293         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1294
1295         return 0;
1296 }