ReadOnly: After performing a recovery, clear out all flags related to readonly delega...
[kamenim/samba-autobuild/.git] / ctdb / server / ctdb_recover.c
1 /* 
2    ctdb recovery code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
29 #include "db_wrap.h"
30
31 /*
32   lock all databases - mark only
33  */
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb, uint32_t priority)
35 {
36         struct ctdb_db_context *ctdb_db;
37
38         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
39                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
40                 return -1;
41         }
42
43         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
44                 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
45                 return -1;
46         }
47         /* The dual loop is a woraround for older versions of samba
48            that does not yet support the set-db-priority/lock order
49            call. So that we get basic deadlock avoiidance also for
50            these old versions of samba.
51            This code will be removed in the future.
52         */
53         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
54                 if (ctdb_db->priority != priority) {
55                         continue;
56                 }
57                 if (strstr(ctdb_db->db_name, "notify") != NULL) {
58                         continue;
59                 }
60                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
61                         return -1;
62                 }
63                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
64                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
65                         return -1;
66                 }
67         }
68         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
69                 if (ctdb_db->priority != priority) {
70                         continue;
71                 }
72                 if (strstr(ctdb_db->db_name, "notify") == NULL) {
73                         continue;
74                 }
75                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
76                         return -1;
77                 }
78                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
79                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
80                         return -1;
81                 }
82         }
83         return 0;
84 }
85
86 /*
87   lock all databases - unmark only
88  */
89 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb, uint32_t priority)
90 {
91         struct ctdb_db_context *ctdb_db;
92
93         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
94                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
95                 return -1;
96         }
97
98         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
99                 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
100                 return -1;
101         }
102         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
103                 if (ctdb_db->priority != priority) {
104                         continue;
105                 }
106                 tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
107                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
108                         return -1;
109                 }
110         }
111         return 0;
112 }
113
114
115 int 
116 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
117 {
118         CHECK_CONTROL_DATA_SIZE(0);
119         struct ctdb_vnn_map_wire *map;
120         size_t len;
121
122         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
123         map = talloc_size(outdata, len);
124         CTDB_NO_MEMORY(ctdb, map);
125
126         map->generation = ctdb->vnn_map->generation;
127         map->size = ctdb->vnn_map->size;
128         memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
129
130         outdata->dsize = len;
131         outdata->dptr  = (uint8_t *)map;
132
133         return 0;
134 }
135
136 int 
137 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
138 {
139         struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
140         int i;
141
142         for(i=1; i<=NUM_DB_PRIORITIES; i++) {
143                 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
144                         DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
145                         return -1;
146                 }
147         }
148
149         talloc_free(ctdb->vnn_map);
150
151         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
152         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
153
154         ctdb->vnn_map->generation = map->generation;
155         ctdb->vnn_map->size       = map->size;
156         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
157         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
158
159         memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
160
161         return 0;
162 }
163
164 int 
165 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
166 {
167         uint32_t i, len;
168         struct ctdb_db_context *ctdb_db;
169         struct ctdb_dbid_map *dbid_map;
170
171         CHECK_CONTROL_DATA_SIZE(0);
172
173         len = 0;
174         for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
175                 len++;
176         }
177
178
179         outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
180         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
181         if (!outdata->dptr) {
182                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
183                 exit(1);
184         }
185
186         dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
187         dbid_map->num = len;
188         for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
189                 dbid_map->dbs[i].dbid       = ctdb_db->db_id;
190                 dbid_map->dbs[i].persistent = ctdb_db->persistent;
191         }
192
193         return 0;
194 }
195
196 int 
197 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
198 {
199         uint32_t i, num_nodes;
200         struct ctdb_node_map *node_map;
201
202         CHECK_CONTROL_DATA_SIZE(0);
203
204         num_nodes = ctdb->num_nodes;
205
206         outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
207         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
208         if (!outdata->dptr) {
209                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
210                 exit(1);
211         }
212
213         node_map = (struct ctdb_node_map *)outdata->dptr;
214         node_map->num = num_nodes;
215         for (i=0; i<num_nodes; i++) {
216                 if (parse_ip(ctdb->nodes[i]->address.address,
217                              NULL, /* TODO: pass in the correct interface here*/
218                              0,
219                              &node_map->nodes[i].addr) == 0)
220                 {
221                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
222                 }
223
224                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
225                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
226         }
227
228         return 0;
229 }
230
231 /*
232    get an old style ipv4-only nodemap
233 */
234 int 
235 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
236 {
237         uint32_t i, num_nodes;
238         struct ctdb_node_mapv4 *node_map;
239
240         CHECK_CONTROL_DATA_SIZE(0);
241
242         num_nodes = ctdb->num_nodes;
243
244         outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
245         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
246         if (!outdata->dptr) {
247                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
248                 exit(1);
249         }
250
251         node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
252         node_map->num = num_nodes;
253         for (i=0; i<num_nodes; i++) {
254                 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
255                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
256                         return -1;
257                 }
258
259                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
260                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
261         }
262
263         return 0;
264 }
265
266 static void
267 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te, 
268                                struct timeval t, void *private_data)
269 {
270         int i, num_nodes;
271         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
272         TALLOC_CTX *tmp_ctx;
273         struct ctdb_node **nodes;       
274
275         tmp_ctx = talloc_new(ctdb);
276
277         /* steal the old nodes file for a while */
278         talloc_steal(tmp_ctx, ctdb->nodes);
279         nodes = ctdb->nodes;
280         ctdb->nodes = NULL;
281         num_nodes = ctdb->num_nodes;
282         ctdb->num_nodes = 0;
283
284         /* load the new nodes file */
285         ctdb_load_nodes_file(ctdb);
286
287         for (i=0; i<ctdb->num_nodes; i++) {
288                 /* keep any identical pre-existing nodes and connections */
289                 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
290                         talloc_free(ctdb->nodes[i]);
291                         ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
292                         continue;
293                 }
294
295                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
296                         continue;
297                 }
298
299                 /* any new or different nodes must be added */
300                 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
301                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
302                         ctdb_fatal(ctdb, "failed to add node. shutting down\n");
303                 }
304                 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
305                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
306                         ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
307                 }
308         }
309
310         /* tell the recovery daemon to reaload the nodes file too */
311         ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
312
313         talloc_free(tmp_ctx);
314         return;
315 }
316
317 /*
318   reload the nodes file after a short delay (so that we can send the response
319   back first
320 */
321 int 
322 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
323 {
324         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
325
326         return 0;
327 }
328
329 /* 
330    a traverse function for pulling all relevent records from pulldb
331  */
332 struct pulldb_data {
333         struct ctdb_context *ctdb;
334         struct ctdb_marshall_buffer *pulldata;
335         uint32_t len;
336         bool failed;
337 };
338
339 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
340 {
341         struct pulldb_data *params = (struct pulldb_data *)p;
342         struct ctdb_rec_data *rec;
343
344         /* add the record to the blob */
345         rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
346         if (rec == NULL) {
347                 params->failed = true;
348                 return -1;
349         }
350         params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
351         if (params->pulldata == NULL) {
352                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
353                 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
354         }
355         params->pulldata->count++;
356         memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
357         params->len += rec->length;
358         talloc_free(rec);
359
360         return 0;
361 }
362
363 /*
364   pul a bunch of records from a ltdb, filtering by lmaster
365  */
366 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
367 {
368         struct ctdb_control_pulldb *pull;
369         struct ctdb_db_context *ctdb_db;
370         struct pulldb_data params;
371         struct ctdb_marshall_buffer *reply;
372
373         pull = (struct ctdb_control_pulldb *)indata.dptr;
374         
375         ctdb_db = find_ctdb_db(ctdb, pull->db_id);
376         if (!ctdb_db) {
377                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
378                 return -1;
379         }
380
381         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
382                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
383                 return -1;
384         }
385
386         reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
387         CTDB_NO_MEMORY(ctdb, reply);
388
389         reply->db_id = pull->db_id;
390
391         params.ctdb = ctdb;
392         params.pulldata = reply;
393         params.len = offsetof(struct ctdb_marshall_buffer, data);
394         params.failed = false;
395
396         if (ctdb_db->unhealthy_reason) {
397                 /* this is just a warning, as the tdb should be empty anyway */
398                 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
399                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
400         }
401
402         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
404                 return -1;
405         }
406
407         if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
408                 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
409                 ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
410                 talloc_free(params.pulldata);
411                 return -1;
412         }
413
414         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
415
416         outdata->dptr = (uint8_t *)params.pulldata;
417         outdata->dsize = params.len;
418
419         return 0;
420 }
421
422 /*
423   push a bunch of records into a ltdb, filtering by rsn
424  */
425 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
426 {
427         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
428         struct ctdb_db_context *ctdb_db;
429         int i, ret;
430         struct ctdb_rec_data *rec;
431
432         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
433                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
434                 return -1;
435         }
436
437         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
438         if (!ctdb_db) {
439                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
440                 return -1;
441         }
442
443         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
444                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
445                 return -1;
446         }
447
448         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
449                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
450                 return -1;
451         }
452
453         rec = (struct ctdb_rec_data *)&reply->data[0];
454
455         DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
456                  reply->count, reply->db_id));
457
458         for (i=0;i<reply->count;i++) {
459                 TDB_DATA key, data;
460                 struct ctdb_ltdb_header *hdr;
461
462                 key.dptr = &rec->data[0];
463                 key.dsize = rec->keylen;
464                 data.dptr = &rec->data[key.dsize];
465                 data.dsize = rec->datalen;
466
467                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
468                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
469                         goto failed;
470                 }
471                 hdr = (struct ctdb_ltdb_header *)data.dptr;
472                 /* strip off any read only record flags. All readonly records
473                    are revoked implicitely by a recovery
474                 */
475                 hdr->flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
476
477                 data.dptr += sizeof(*hdr);
478                 data.dsize -= sizeof(*hdr);
479
480                 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
481                 if (ret != 0) {
482                         DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
483                         goto failed;
484                 }
485
486                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
487         }           
488
489         DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
490                  reply->count, reply->db_id));
491
492         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
493         return 0;
494
495 failed:
496         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
497         return -1;
498 }
499
500
501 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
502 {
503         uint32_t *dmaster = (uint32_t *)p;
504         struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
505         int ret;
506
507         /* skip if already correct */
508         if (header->dmaster == *dmaster) {
509                 return 0;
510         }
511
512         header->dmaster = *dmaster;
513
514         ret = tdb_store(tdb, key, data, TDB_REPLACE);
515         if (ret) {
516                 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back  ret:%d\n",ret));
517                 return ret;
518         }
519
520         /* TODO: add error checking here */
521
522         return 0;
523 }
524
525 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
526 {
527         struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
528         struct ctdb_db_context *ctdb_db;
529
530         ctdb_db = find_ctdb_db(ctdb, p->db_id);
531         if (!ctdb_db) {
532                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
533                 return -1;
534         }
535
536         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
537                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
538                 return -1;
539         }
540
541         if (ctdb_lock_all_databases_mark(ctdb,  ctdb_db->priority) != 0) {
542                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
543                 return -1;
544         }
545
546         tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
547
548         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
549         
550         return 0;
551 }
552
553 struct ctdb_set_recmode_state {
554         struct ctdb_context *ctdb;
555         struct ctdb_req_control *c;
556         uint32_t recmode;
557         int fd[2];
558         struct timed_event *te;
559         struct fd_event *fde;
560         pid_t child;
561         struct timeval start_time;
562 };
563
564 /*
565   called if our set_recmode child times out. this would happen if
566   ctdb_recovery_lock() would block.
567  */
568 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te, 
569                                          struct timeval t, void *private_data)
570 {
571         struct ctdb_set_recmode_state *state = talloc_get_type(private_data, 
572                                            struct ctdb_set_recmode_state);
573
574         /* we consider this a success, not a failure, as we failed to
575            set the recovery lock which is what we wanted.  This can be
576            caused by the cluster filesystem being very slow to
577            arbitrate locks immediately after a node failure.       
578          */
579         DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
580         state->ctdb->recovery_mode = state->recmode;
581         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
582         talloc_free(state);
583 }
584
585
586 /* when we free the recmode state we must kill any child process.
587 */
588 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
589 {
590         double l = timeval_elapsed(&state->start_time);
591
592         CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
593
594         if (state->fd[0] != -1) {
595                 state->fd[0] = -1;
596         }
597         if (state->fd[1] != -1) {
598                 state->fd[1] = -1;
599         }
600         kill(state->child, SIGKILL);
601         return 0;
602 }
603
604 /* this is called when the client process has completed ctdb_recovery_lock()
605    and has written data back to us through the pipe.
606 */
607 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde, 
608                              uint16_t flags, void *private_data)
609 {
610         struct ctdb_set_recmode_state *state= talloc_get_type(private_data, 
611                                              struct ctdb_set_recmode_state);
612         char c = 0;
613         int ret;
614
615         /* we got a response from our child process so we can abort the
616            timeout.
617         */
618         talloc_free(state->te);
619         state->te = NULL;
620
621
622         /* read the childs status when trying to lock the reclock file.
623            child wrote 0 if everything is fine and 1 if it did manage
624            to lock the file, which would be a problem since that means
625            we got a request to exit from recovery but we could still lock
626            the file   which at this time SHOULD be locked by the recovery
627            daemon on the recmaster
628         */              
629         ret = read(state->fd[0], &c, 1);
630         if (ret != 1 || c != 0) {
631                 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
632                 talloc_free(state);
633                 return;
634         }
635
636         state->ctdb->recovery_mode = state->recmode;
637
638         /* release any deferred attach calls from clients */
639         if (state->recmode == CTDB_RECOVERY_NORMAL) {
640                 ctdb_process_deferred_attach(state->ctdb);
641         }
642
643         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
644         talloc_free(state);
645         return;
646 }
647
648 static void
649 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te, 
650                                struct timeval t, void *private_data)
651 {
652         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
653
654         DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
655         talloc_free(ctdb->release_ips_ctx);
656         ctdb->release_ips_ctx = NULL;
657
658         ctdb_release_all_ips(ctdb);
659 }
660
661 /*
662  * Set up an event to drop all public ips if we remain in recovery for too
663  * long
664  */
665 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
666 {
667         if (ctdb->release_ips_ctx != NULL) {
668                 talloc_free(ctdb->release_ips_ctx);
669         }
670         ctdb->release_ips_ctx = talloc_new(ctdb);
671         CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
672
673         event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
674         return 0;
675 }
676
677 /*
678   set the recovery mode
679  */
680 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
681                                  struct ctdb_req_control *c,
682                                  TDB_DATA indata, bool *async_reply,
683                                  const char **errormsg)
684 {
685         uint32_t recmode = *(uint32_t *)indata.dptr;
686         int i, ret;
687         struct ctdb_set_recmode_state *state;
688         pid_t parent = getpid();
689
690         /* if we enter recovery but stay in recovery for too long
691            we will eventually drop all our ip addresses
692         */
693         if (recmode == CTDB_RECOVERY_NORMAL) {
694                 talloc_free(ctdb->release_ips_ctx);
695                 ctdb->release_ips_ctx = NULL;
696         } else {
697                 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
698                         DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
699                 }
700         }
701
702         if (recmode != ctdb->recovery_mode) {
703                 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n", 
704                          recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
705         }
706
707         if (recmode != CTDB_RECOVERY_NORMAL ||
708             ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
709                 ctdb->recovery_mode = recmode;
710                 return 0;
711         }
712
713         /* some special handling when ending recovery mode */
714
715         /* force the databases to thaw */
716         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
717                 if (ctdb->freeze_handles[i] != NULL) {
718                         ctdb_control_thaw(ctdb, i);
719                 }
720         }
721
722         state = talloc(ctdb, struct ctdb_set_recmode_state);
723         CTDB_NO_MEMORY(ctdb, state);
724
725         state->start_time = timeval_current();
726         state->fd[0] = -1;
727         state->fd[1] = -1;
728
729         /* release any deferred attach calls from clients */
730         if (recmode == CTDB_RECOVERY_NORMAL) {
731                 ctdb_process_deferred_attach(ctdb);
732         }
733
734         if (ctdb->tunable.verify_recovery_lock == 0) {
735                 /* dont need to verify the reclock file */
736                 ctdb->recovery_mode = recmode;
737                 return 0;
738         }
739
740         /* For the rest of what needs to be done, we need to do this in
741            a child process since 
742            1, the call to ctdb_recovery_lock() can block if the cluster
743               filesystem is in the process of recovery.
744         */
745         ret = pipe(state->fd);
746         if (ret != 0) {
747                 talloc_free(state);
748                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
749                 return -1;
750         }
751
752         state->child = fork();
753         if (state->child == (pid_t)-1) {
754                 close(state->fd[0]);
755                 close(state->fd[1]);
756                 talloc_free(state);
757                 return -1;
758         }
759
760         if (state->child == 0) {
761                 char cc = 0;
762                 close(state->fd[0]);
763
764                 debug_extra = talloc_asprintf(NULL, "set_recmode:");
765                 /* we should not be able to get the lock on the reclock file, 
766                   as it should  be held by the recovery master 
767                 */
768                 if (ctdb_recovery_lock(ctdb, false)) {
769                         DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
770                         cc = 1;
771                 }
772
773                 write(state->fd[1], &cc, 1);
774                 /* make sure we die when our parent dies */
775                 while (kill(parent, 0) == 0 || errno != ESRCH) {
776                         sleep(5);
777                         write(state->fd[1], &cc, 1);
778                 }
779                 _exit(0);
780         }
781         close(state->fd[1]);
782         set_close_on_exec(state->fd[0]);
783
784         state->fd[1] = -1;
785
786         talloc_set_destructor(state, set_recmode_destructor);
787
788         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
789
790         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
791                                     ctdb_set_recmode_timeout, state);
792
793         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
794                                 EVENT_FD_READ,
795                                 set_recmode_handler,
796                                 (void *)state);
797
798         if (state->fde == NULL) {
799                 talloc_free(state);
800                 return -1;
801         }
802         tevent_fd_set_auto_close(state->fde);
803
804         state->ctdb    = ctdb;
805         state->recmode = recmode;
806         state->c       = talloc_steal(state, c);
807
808         *async_reply = true;
809
810         return 0;
811 }
812
813
814 /*
815   try and get the recovery lock in shared storage - should only work
816   on the recovery master recovery daemon. Anywhere else is a bug
817  */
818 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
819 {
820         struct flock lock;
821
822         if (keep) {
823                 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
824         }
825         if (ctdb->recovery_lock_fd != -1) {
826                 close(ctdb->recovery_lock_fd);
827                 ctdb->recovery_lock_fd = -1;
828         }
829
830         ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
831         if (ctdb->recovery_lock_fd == -1) {
832                 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n", 
833                          ctdb->recovery_lock_file, strerror(errno)));
834                 return false;
835         }
836
837         set_close_on_exec(ctdb->recovery_lock_fd);
838
839         lock.l_type = F_WRLCK;
840         lock.l_whence = SEEK_SET;
841         lock.l_start = 0;
842         lock.l_len = 1;
843         lock.l_pid = 0;
844
845         if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
846                 close(ctdb->recovery_lock_fd);
847                 ctdb->recovery_lock_fd = -1;
848                 if (keep) {
849                         DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
850                 }
851                 return false;
852         }
853
854         if (!keep) {
855                 close(ctdb->recovery_lock_fd);
856                 ctdb->recovery_lock_fd = -1;
857         }
858
859         if (keep) {
860                 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
861         }
862
863         DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
864
865         return true;
866 }
867
868 /*
869   delete a record as part of the vacuum process
870   only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
871   use non-blocking locks
872
873   return 0 if the record was successfully deleted (i.e. it does not exist
874   when the function returns)
875   or !0 is the record still exists in the tdb after returning.
876  */
877 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
878 {
879         TDB_DATA key, data;
880         struct ctdb_ltdb_header *hdr, *hdr2;
881         
882         /* these are really internal tdb functions - but we need them here for
883            non-blocking lock of the freelist */
884         int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
885         int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
886
887
888         key.dsize = rec->keylen;
889         key.dptr  = &rec->data[0];
890         data.dsize = rec->datalen;
891         data.dptr = &rec->data[rec->keylen];
892
893         if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
894                 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
895                 return -1;
896         }
897
898         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
899                 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
900                 return -1;
901         }
902
903         hdr = (struct ctdb_ltdb_header *)data.dptr;
904
905         /* use a non-blocking lock */
906         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
907                 return -1;
908         }
909
910         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
911         if (data.dptr == NULL) {
912                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
913                 return 0;
914         }
915
916         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
917                 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
918                         tdb_delete(ctdb_db->ltdb->tdb, key);
919                         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
920                         DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
921                 }
922                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
923                 free(data.dptr);
924                 return 0;
925         }
926         
927         hdr2 = (struct ctdb_ltdb_header *)data.dptr;
928
929         if (hdr2->rsn > hdr->rsn) {
930                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
931                 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
932                          (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
933                 free(data.dptr);
934                 return -1;              
935         }
936
937         if (hdr2->dmaster == ctdb->pnn) {
938                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
939                 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
940                 free(data.dptr);
941                 return -1;                              
942         }
943
944         if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
945                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
946                 free(data.dptr);
947                 return -1;                              
948         }
949
950         if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
951                 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
952                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
953                 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
954                 free(data.dptr);
955                 return -1;                                              
956         }
957
958         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
959         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
960         free(data.dptr);
961         return 0;       
962 }
963
964
965
966 struct recovery_callback_state {
967         struct ctdb_req_control *c;
968 };
969
970
971 /*
972   called when the 'recovered' event script has finished
973  */
974 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
975 {
976         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
977
978         ctdb_enable_monitoring(ctdb);
979         CTDB_INCREMENT_STAT(ctdb, num_recoveries);
980
981         if (status != 0) {
982                 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
983                 if (status == -ETIME) {
984                         ctdb_ban_self(ctdb);
985                 }
986         }
987
988         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
989         talloc_free(state);
990
991         gettimeofday(&ctdb->last_recovery_finished, NULL);
992 }
993
994 /*
995   recovery has finished
996  */
997 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb, 
998                                 struct ctdb_req_control *c,
999                                 bool *async_reply)
1000 {
1001         int ret;
1002         struct recovery_callback_state *state;
1003
1004         DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1005
1006         ctdb_persistent_finish_trans3_commits(ctdb);
1007
1008         state = talloc(ctdb, struct recovery_callback_state);
1009         CTDB_NO_MEMORY(ctdb, state);
1010
1011         state->c    = c;
1012
1013         ctdb_disable_monitoring(ctdb);
1014
1015         ret = ctdb_event_script_callback(ctdb, state,
1016                                          ctdb_end_recovery_callback, 
1017                                          state, 
1018                                          false,
1019                                          CTDB_EVENT_RECOVERED, "%s", "");
1020
1021         if (ret != 0) {
1022                 ctdb_enable_monitoring(ctdb);
1023
1024                 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1025                 talloc_free(state);
1026                 return -1;
1027         }
1028
1029         /* tell the control that we will be reply asynchronously */
1030         state->c    = talloc_steal(state, c);
1031         *async_reply = true;
1032         return 0;
1033 }
1034
1035 /*
1036   called when the 'startrecovery' event script has finished
1037  */
1038 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1039 {
1040         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1041
1042         if (status != 0) {
1043                 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1044         }
1045
1046         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1047         talloc_free(state);
1048 }
1049
1050 /*
1051   run the startrecovery eventscript
1052  */
1053 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb, 
1054                                 struct ctdb_req_control *c,
1055                                 bool *async_reply)
1056 {
1057         int ret;
1058         struct recovery_callback_state *state;
1059
1060         DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1061         gettimeofday(&ctdb->last_recovery_started, NULL);
1062
1063         state = talloc(ctdb, struct recovery_callback_state);
1064         CTDB_NO_MEMORY(ctdb, state);
1065
1066         state->c    = talloc_steal(state, c);
1067
1068         ctdb_disable_monitoring(ctdb);
1069
1070         ret = ctdb_event_script_callback(ctdb, state,
1071                                          ctdb_start_recovery_callback, 
1072                                          state, false,
1073                                          CTDB_EVENT_START_RECOVERY,
1074                                          "%s", "");
1075
1076         if (ret != 0) {
1077                 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1078                 talloc_free(state);
1079                 return -1;
1080         }
1081
1082         /* tell the control that we will be reply asynchronously */
1083         *async_reply = true;
1084         return 0;
1085 }
1086
1087 /*
1088  try to delete all these records as part of the vacuuming process
1089  and return the records we failed to delete
1090 */
1091 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1092 {
1093         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1094         struct ctdb_db_context *ctdb_db;
1095         int i;
1096         struct ctdb_rec_data *rec;
1097         struct ctdb_marshall_buffer *records;
1098
1099         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1100                 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1101                 return -1;
1102         }
1103
1104         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1105         if (!ctdb_db) {
1106                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1107                 return -1;
1108         }
1109
1110
1111         DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1112                  reply->count, reply->db_id));
1113
1114
1115         /* create a blob to send back the records we couldnt delete */  
1116         records = (struct ctdb_marshall_buffer *)
1117                         talloc_zero_size(outdata, 
1118                                     offsetof(struct ctdb_marshall_buffer, data));
1119         if (records == NULL) {
1120                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1121                 return -1;
1122         }
1123         records->db_id = ctdb_db->db_id;
1124
1125
1126         rec = (struct ctdb_rec_data *)&reply->data[0];
1127         for (i=0;i<reply->count;i++) {
1128                 TDB_DATA key, data;
1129
1130                 key.dptr = &rec->data[0];
1131                 key.dsize = rec->keylen;
1132                 data.dptr = &rec->data[key.dsize];
1133                 data.dsize = rec->datalen;
1134
1135                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1136                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1137                         return -1;
1138                 }
1139
1140                 /* If we cant delete the record we must add it to the reply
1141                    so the lmaster knows it may not purge this record
1142                 */
1143                 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1144                         size_t old_size;
1145                         struct ctdb_ltdb_header *hdr;
1146
1147                         hdr = (struct ctdb_ltdb_header *)data.dptr;
1148                         data.dptr += sizeof(*hdr);
1149                         data.dsize -= sizeof(*hdr);
1150
1151                         DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1152
1153                         old_size = talloc_get_size(records);
1154                         records = talloc_realloc_size(outdata, records, old_size + rec->length);
1155                         if (records == NULL) {
1156                                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1157                                 return -1;
1158                         }
1159                         records->count++;
1160                         memcpy(old_size+(uint8_t *)records, rec, rec->length);
1161                 } 
1162
1163                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1164         }           
1165
1166
1167         outdata->dptr = (uint8_t *)records;
1168         outdata->dsize = talloc_get_size(records);
1169
1170         return 0;
1171 }
1172
1173 /*
1174   report capabilities
1175  */
1176 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1177 {
1178         uint32_t *capabilities = NULL;
1179
1180         capabilities = talloc(outdata, uint32_t);
1181         CTDB_NO_MEMORY(ctdb, capabilities);
1182         *capabilities = ctdb->capabilities;
1183
1184         outdata->dsize = sizeof(uint32_t);
1185         outdata->dptr = (uint8_t *)capabilities;
1186
1187         return 0;       
1188 }
1189
1190 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1191 {
1192         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1193         uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1194
1195         DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1196
1197         if (*count < ctdb->tunable.recd_ping_failcount) {
1198                 (*count)++;
1199                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1200                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1201                         ctdb_recd_ping_timeout, ctdb);
1202                 return;
1203         }
1204
1205         DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1206
1207         ctdb_stop_recoverd(ctdb);
1208         ctdb_start_recoverd(ctdb);
1209 }
1210
1211 /* The recovery daemon will ping us at regular intervals.
1212    If we havent been pinged for a while we assume the recovery
1213    daemon is inoperable and we shut down.
1214 */
1215 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1216 {
1217         talloc_free(ctdb->recd_ping_count);
1218
1219         ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1220         CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1221
1222         if (ctdb->tunable.recd_ping_timeout != 0) {
1223                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1224                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1225                         ctdb_recd_ping_timeout, ctdb);
1226         }
1227
1228         return 0;
1229 }
1230
1231
1232
1233 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1234 {
1235         CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1236
1237         ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
1238         return 0;
1239 }
1240
1241
1242 struct stop_node_callback_state {
1243         struct ctdb_req_control *c;
1244 };
1245
1246 /*
1247   called when the 'stopped' event script has finished
1248  */
1249 static void ctdb_stop_node_callback(struct ctdb_context *ctdb, int status, void *p)
1250 {
1251         struct stop_node_callback_state *state = talloc_get_type(p, struct stop_node_callback_state);
1252
1253         if (status != 0) {
1254                 DEBUG(DEBUG_ERR,(__location__ " stopped event script failed (status %d)\n", status));
1255                 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1256                 if (status == -ETIME) {
1257                         ctdb_ban_self(ctdb);
1258                 }
1259         }
1260
1261         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1262         talloc_free(state);
1263 }
1264
1265 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply)
1266 {
1267         int ret;
1268         struct stop_node_callback_state *state;
1269
1270         DEBUG(DEBUG_INFO,(__location__ " Stopping node\n"));
1271
1272         state = talloc(ctdb, struct stop_node_callback_state);
1273         CTDB_NO_MEMORY(ctdb, state);
1274
1275         state->c    = talloc_steal(state, c);
1276
1277         ctdb_disable_monitoring(ctdb);
1278
1279         ret = ctdb_event_script_callback(ctdb, state,
1280                                          ctdb_stop_node_callback, 
1281                                          state, false,
1282                                          CTDB_EVENT_STOPPED, "%s", "");
1283
1284         if (ret != 0) {
1285                 ctdb_enable_monitoring(ctdb);
1286
1287                 DEBUG(DEBUG_ERR,(__location__ " Failed to stop node\n"));
1288                 talloc_free(state);
1289                 return -1;
1290         }
1291
1292         ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1293
1294         *async_reply = true;
1295
1296         return 0;
1297 }
1298
1299 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1300 {
1301         DEBUG(DEBUG_INFO,(__location__ " Continue node\n"));
1302         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1303
1304         return 0;
1305 }