ReadOnly: Change the ctdb_db structure to keep a uint8_t for flags instead of a boole...
[ctdb.git] / server / ctdb_recover.c
1 /* 
2    ctdb recovery code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
29 #include "db_wrap.h"
30
31 /*
32   lock all databases - mark only
33  */
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb, uint32_t priority)
35 {
36         struct ctdb_db_context *ctdb_db;
37
38         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
39                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
40                 return -1;
41         }
42
43         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
44                 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
45                 return -1;
46         }
47         /* The dual loop is a woraround for older versions of samba
48            that does not yet support the set-db-priority/lock order
49            call. So that we get basic deadlock avoiidance also for
50            these old versions of samba.
51            This code will be removed in the future.
52         */
53         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
54                 if (ctdb_db->priority != priority) {
55                         continue;
56                 }
57                 if (strstr(ctdb_db->db_name, "notify") != NULL) {
58                         continue;
59                 }
60                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
61                         return -1;
62                 }
63                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
64                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
65                         return -1;
66                 }
67         }
68         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
69                 if (ctdb_db->priority != priority) {
70                         continue;
71                 }
72                 if (strstr(ctdb_db->db_name, "notify") == NULL) {
73                         continue;
74                 }
75                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
76                         return -1;
77                 }
78                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
79                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
80                         return -1;
81                 }
82         }
83         return 0;
84 }
85
86 /*
87   lock all databases - unmark only
88  */
89 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb, uint32_t priority)
90 {
91         struct ctdb_db_context *ctdb_db;
92
93         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
94                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
95                 return -1;
96         }
97
98         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
99                 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
100                 return -1;
101         }
102         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
103                 if (ctdb_db->priority != priority) {
104                         continue;
105                 }
106                 tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
107                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
108                         return -1;
109                 }
110         }
111         return 0;
112 }
113
114
115 int 
116 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
117 {
118         CHECK_CONTROL_DATA_SIZE(0);
119         struct ctdb_vnn_map_wire *map;
120         size_t len;
121
122         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
123         map = talloc_size(outdata, len);
124         CTDB_NO_MEMORY(ctdb, map);
125
126         map->generation = ctdb->vnn_map->generation;
127         map->size = ctdb->vnn_map->size;
128         memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
129
130         outdata->dsize = len;
131         outdata->dptr  = (uint8_t *)map;
132
133         return 0;
134 }
135
136 int 
137 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
138 {
139         struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
140         int i;
141
142         for(i=1; i<=NUM_DB_PRIORITIES; i++) {
143                 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
144                         DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
145                         return -1;
146                 }
147         }
148
149         talloc_free(ctdb->vnn_map);
150
151         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
152         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
153
154         ctdb->vnn_map->generation = map->generation;
155         ctdb->vnn_map->size       = map->size;
156         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
157         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
158
159         memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
160
161         return 0;
162 }
163
164 int 
165 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
166 {
167         uint32_t i, len;
168         struct ctdb_db_context *ctdb_db;
169         struct ctdb_dbid_map *dbid_map;
170
171         CHECK_CONTROL_DATA_SIZE(0);
172
173         len = 0;
174         for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
175                 len++;
176         }
177
178
179         outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
180         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
181         if (!outdata->dptr) {
182                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
183                 exit(1);
184         }
185
186         dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
187         dbid_map->num = len;
188         for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
189                 dbid_map->dbs[i].dbid       = ctdb_db->db_id;
190                 if (ctdb_db->persistent != 0) {
191                         dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
192                 }
193         }
194
195         return 0;
196 }
197
198 int 
199 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
200 {
201         uint32_t i, num_nodes;
202         struct ctdb_node_map *node_map;
203
204         CHECK_CONTROL_DATA_SIZE(0);
205
206         num_nodes = ctdb->num_nodes;
207
208         outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
209         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
210         if (!outdata->dptr) {
211                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
212                 exit(1);
213         }
214
215         node_map = (struct ctdb_node_map *)outdata->dptr;
216         node_map->num = num_nodes;
217         for (i=0; i<num_nodes; i++) {
218                 if (parse_ip(ctdb->nodes[i]->address.address,
219                              NULL, /* TODO: pass in the correct interface here*/
220                              0,
221                              &node_map->nodes[i].addr) == 0)
222                 {
223                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
224                 }
225
226                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
227                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
228         }
229
230         return 0;
231 }
232
233 /*
234    get an old style ipv4-only nodemap
235 */
236 int 
237 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
238 {
239         uint32_t i, num_nodes;
240         struct ctdb_node_mapv4 *node_map;
241
242         CHECK_CONTROL_DATA_SIZE(0);
243
244         num_nodes = ctdb->num_nodes;
245
246         outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
247         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
248         if (!outdata->dptr) {
249                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
250                 exit(1);
251         }
252
253         node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
254         node_map->num = num_nodes;
255         for (i=0; i<num_nodes; i++) {
256                 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
257                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
258                         return -1;
259                 }
260
261                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
262                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
263         }
264
265         return 0;
266 }
267
268 static void
269 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te, 
270                                struct timeval t, void *private_data)
271 {
272         int i, num_nodes;
273         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
274         TALLOC_CTX *tmp_ctx;
275         struct ctdb_node **nodes;       
276
277         tmp_ctx = talloc_new(ctdb);
278
279         /* steal the old nodes file for a while */
280         talloc_steal(tmp_ctx, ctdb->nodes);
281         nodes = ctdb->nodes;
282         ctdb->nodes = NULL;
283         num_nodes = ctdb->num_nodes;
284         ctdb->num_nodes = 0;
285
286         /* load the new nodes file */
287         ctdb_load_nodes_file(ctdb);
288
289         for (i=0; i<ctdb->num_nodes; i++) {
290                 /* keep any identical pre-existing nodes and connections */
291                 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
292                         talloc_free(ctdb->nodes[i]);
293                         ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
294                         continue;
295                 }
296
297                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
298                         continue;
299                 }
300
301                 /* any new or different nodes must be added */
302                 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
303                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
304                         ctdb_fatal(ctdb, "failed to add node. shutting down\n");
305                 }
306                 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
307                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
308                         ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
309                 }
310         }
311
312         /* tell the recovery daemon to reaload the nodes file too */
313         ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
314
315         talloc_free(tmp_ctx);
316         return;
317 }
318
319 /*
320   reload the nodes file after a short delay (so that we can send the response
321   back first
322 */
323 int 
324 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
325 {
326         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
327
328         return 0;
329 }
330
331 /* 
332    a traverse function for pulling all relevent records from pulldb
333  */
334 struct pulldb_data {
335         struct ctdb_context *ctdb;
336         struct ctdb_marshall_buffer *pulldata;
337         uint32_t len;
338         bool failed;
339 };
340
341 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
342 {
343         struct pulldb_data *params = (struct pulldb_data *)p;
344         struct ctdb_rec_data *rec;
345
346         /* add the record to the blob */
347         rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
348         if (rec == NULL) {
349                 params->failed = true;
350                 return -1;
351         }
352         params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
353         if (params->pulldata == NULL) {
354                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
355                 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
356         }
357         params->pulldata->count++;
358         memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
359         params->len += rec->length;
360         talloc_free(rec);
361
362         return 0;
363 }
364
365 /*
366   pul a bunch of records from a ltdb, filtering by lmaster
367  */
368 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
369 {
370         struct ctdb_control_pulldb *pull;
371         struct ctdb_db_context *ctdb_db;
372         struct pulldb_data params;
373         struct ctdb_marshall_buffer *reply;
374
375         pull = (struct ctdb_control_pulldb *)indata.dptr;
376         
377         ctdb_db = find_ctdb_db(ctdb, pull->db_id);
378         if (!ctdb_db) {
379                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
380                 return -1;
381         }
382
383         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
384                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
385                 return -1;
386         }
387
388         reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
389         CTDB_NO_MEMORY(ctdb, reply);
390
391         reply->db_id = pull->db_id;
392
393         params.ctdb = ctdb;
394         params.pulldata = reply;
395         params.len = offsetof(struct ctdb_marshall_buffer, data);
396         params.failed = false;
397
398         if (ctdb_db->unhealthy_reason) {
399                 /* this is just a warning, as the tdb should be empty anyway */
400                 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
401                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
402         }
403
404         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
405                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
406                 return -1;
407         }
408
409         if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
410                 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
411                 ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
412                 talloc_free(params.pulldata);
413                 return -1;
414         }
415
416         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
417
418         outdata->dptr = (uint8_t *)params.pulldata;
419         outdata->dsize = params.len;
420
421         return 0;
422 }
423
424 /*
425   push a bunch of records into a ltdb, filtering by rsn
426  */
427 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
428 {
429         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
430         struct ctdb_db_context *ctdb_db;
431         int i, ret;
432         struct ctdb_rec_data *rec;
433
434         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
435                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
436                 return -1;
437         }
438
439         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
440         if (!ctdb_db) {
441                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
442                 return -1;
443         }
444
445         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
446                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
447                 return -1;
448         }
449
450         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
451                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
452                 return -1;
453         }
454
455         rec = (struct ctdb_rec_data *)&reply->data[0];
456
457         DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
458                  reply->count, reply->db_id));
459
460         for (i=0;i<reply->count;i++) {
461                 TDB_DATA key, data;
462                 struct ctdb_ltdb_header *hdr;
463
464                 key.dptr = &rec->data[0];
465                 key.dsize = rec->keylen;
466                 data.dptr = &rec->data[key.dsize];
467                 data.dsize = rec->datalen;
468
469                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
470                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
471                         goto failed;
472                 }
473                 hdr = (struct ctdb_ltdb_header *)data.dptr;
474                 /* strip off any read only record flags. All readonly records
475                    are revoked implicitely by a recovery
476                 */
477                 hdr->flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
478
479                 data.dptr += sizeof(*hdr);
480                 data.dsize -= sizeof(*hdr);
481
482                 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
483                 if (ret != 0) {
484                         DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
485                         goto failed;
486                 }
487
488                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
489         }           
490
491         DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
492                  reply->count, reply->db_id));
493
494         if (ctdb_db->readonly) {
495                 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
496                                   ctdb_db->db_id));
497                 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
498                         DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
499                         ctdb_db->readonly = false;
500                         tdb_close(ctdb_db->rottdb);
501                         ctdb_db->rottdb = NULL;
502                         ctdb_db->readonly = false;
503                 }
504                 while (ctdb_db->revokechild_active != NULL) {
505                         talloc_free(ctdb_db->revokechild_active);
506                         ctdb_db->revokechild_active = NULL;
507                 }
508         }
509
510         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
511         return 0;
512
513 failed:
514         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
515         return -1;
516 }
517
518
519 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
520 {
521         uint32_t *dmaster = (uint32_t *)p;
522         struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
523         int ret;
524
525         /* skip if already correct */
526         if (header->dmaster == *dmaster) {
527                 return 0;
528         }
529
530         header->dmaster = *dmaster;
531
532         ret = tdb_store(tdb, key, data, TDB_REPLACE);
533         if (ret) {
534                 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back  ret:%d\n",ret));
535                 return ret;
536         }
537
538         /* TODO: add error checking here */
539
540         return 0;
541 }
542
543 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
544 {
545         struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
546         struct ctdb_db_context *ctdb_db;
547
548         ctdb_db = find_ctdb_db(ctdb, p->db_id);
549         if (!ctdb_db) {
550                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
551                 return -1;
552         }
553
554         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
555                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
556                 return -1;
557         }
558
559         if (ctdb_lock_all_databases_mark(ctdb,  ctdb_db->priority) != 0) {
560                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
561                 return -1;
562         }
563
564         tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
565
566         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
567         
568         return 0;
569 }
570
571 struct ctdb_set_recmode_state {
572         struct ctdb_context *ctdb;
573         struct ctdb_req_control *c;
574         uint32_t recmode;
575         int fd[2];
576         struct timed_event *te;
577         struct fd_event *fde;
578         pid_t child;
579         struct timeval start_time;
580 };
581
582 /*
583   called if our set_recmode child times out. this would happen if
584   ctdb_recovery_lock() would block.
585  */
586 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te, 
587                                          struct timeval t, void *private_data)
588 {
589         struct ctdb_set_recmode_state *state = talloc_get_type(private_data, 
590                                            struct ctdb_set_recmode_state);
591
592         /* we consider this a success, not a failure, as we failed to
593            set the recovery lock which is what we wanted.  This can be
594            caused by the cluster filesystem being very slow to
595            arbitrate locks immediately after a node failure.       
596          */
597         DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
598         state->ctdb->recovery_mode = state->recmode;
599         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
600         talloc_free(state);
601 }
602
603
604 /* when we free the recmode state we must kill any child process.
605 */
606 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
607 {
608         double l = timeval_elapsed(&state->start_time);
609
610         CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
611
612         if (state->fd[0] != -1) {
613                 state->fd[0] = -1;
614         }
615         if (state->fd[1] != -1) {
616                 state->fd[1] = -1;
617         }
618         kill(state->child, SIGKILL);
619         return 0;
620 }
621
622 /* this is called when the client process has completed ctdb_recovery_lock()
623    and has written data back to us through the pipe.
624 */
625 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde, 
626                              uint16_t flags, void *private_data)
627 {
628         struct ctdb_set_recmode_state *state= talloc_get_type(private_data, 
629                                              struct ctdb_set_recmode_state);
630         char c = 0;
631         int ret;
632
633         /* we got a response from our child process so we can abort the
634            timeout.
635         */
636         talloc_free(state->te);
637         state->te = NULL;
638
639
640         /* read the childs status when trying to lock the reclock file.
641            child wrote 0 if everything is fine and 1 if it did manage
642            to lock the file, which would be a problem since that means
643            we got a request to exit from recovery but we could still lock
644            the file   which at this time SHOULD be locked by the recovery
645            daemon on the recmaster
646         */              
647         ret = read(state->fd[0], &c, 1);
648         if (ret != 1 || c != 0) {
649                 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
650                 talloc_free(state);
651                 return;
652         }
653
654         state->ctdb->recovery_mode = state->recmode;
655
656         /* release any deferred attach calls from clients */
657         if (state->recmode == CTDB_RECOVERY_NORMAL) {
658                 ctdb_process_deferred_attach(state->ctdb);
659         }
660
661         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
662         talloc_free(state);
663         return;
664 }
665
666 static void
667 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te, 
668                                struct timeval t, void *private_data)
669 {
670         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
671
672         DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
673         talloc_free(ctdb->release_ips_ctx);
674         ctdb->release_ips_ctx = NULL;
675
676         ctdb_release_all_ips(ctdb);
677 }
678
679 /*
680  * Set up an event to drop all public ips if we remain in recovery for too
681  * long
682  */
683 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
684 {
685         if (ctdb->release_ips_ctx != NULL) {
686                 talloc_free(ctdb->release_ips_ctx);
687         }
688         ctdb->release_ips_ctx = talloc_new(ctdb);
689         CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
690
691         event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
692         return 0;
693 }
694
695 /*
696   set the recovery mode
697  */
698 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
699                                  struct ctdb_req_control *c,
700                                  TDB_DATA indata, bool *async_reply,
701                                  const char **errormsg)
702 {
703         uint32_t recmode = *(uint32_t *)indata.dptr;
704         int i, ret;
705         struct ctdb_set_recmode_state *state;
706         pid_t parent = getpid();
707
708         /* if we enter recovery but stay in recovery for too long
709            we will eventually drop all our ip addresses
710         */
711         if (recmode == CTDB_RECOVERY_NORMAL) {
712                 talloc_free(ctdb->release_ips_ctx);
713                 ctdb->release_ips_ctx = NULL;
714         } else {
715                 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
716                         DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
717                 }
718         }
719
720         if (recmode != ctdb->recovery_mode) {
721                 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n", 
722                          recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
723         }
724
725         if (recmode != CTDB_RECOVERY_NORMAL ||
726             ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
727                 ctdb->recovery_mode = recmode;
728                 return 0;
729         }
730
731         /* some special handling when ending recovery mode */
732
733         /* force the databases to thaw */
734         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
735                 if (ctdb->freeze_handles[i] != NULL) {
736                         ctdb_control_thaw(ctdb, i);
737                 }
738         }
739
740         state = talloc(ctdb, struct ctdb_set_recmode_state);
741         CTDB_NO_MEMORY(ctdb, state);
742
743         state->start_time = timeval_current();
744         state->fd[0] = -1;
745         state->fd[1] = -1;
746
747         /* release any deferred attach calls from clients */
748         if (recmode == CTDB_RECOVERY_NORMAL) {
749                 ctdb_process_deferred_attach(ctdb);
750         }
751
752         if (ctdb->tunable.verify_recovery_lock == 0) {
753                 /* dont need to verify the reclock file */
754                 ctdb->recovery_mode = recmode;
755                 return 0;
756         }
757
758         /* For the rest of what needs to be done, we need to do this in
759            a child process since 
760            1, the call to ctdb_recovery_lock() can block if the cluster
761               filesystem is in the process of recovery.
762         */
763         ret = pipe(state->fd);
764         if (ret != 0) {
765                 talloc_free(state);
766                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
767                 return -1;
768         }
769
770         state->child = fork();
771         if (state->child == (pid_t)-1) {
772                 close(state->fd[0]);
773                 close(state->fd[1]);
774                 talloc_free(state);
775                 return -1;
776         }
777
778         if (state->child == 0) {
779                 char cc = 0;
780                 close(state->fd[0]);
781
782                 debug_extra = talloc_asprintf(NULL, "set_recmode:");
783                 /* we should not be able to get the lock on the reclock file, 
784                   as it should  be held by the recovery master 
785                 */
786                 if (ctdb_recovery_lock(ctdb, false)) {
787                         DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
788                         cc = 1;
789                 }
790
791                 write(state->fd[1], &cc, 1);
792                 /* make sure we die when our parent dies */
793                 while (kill(parent, 0) == 0 || errno != ESRCH) {
794                         sleep(5);
795                         write(state->fd[1], &cc, 1);
796                 }
797                 _exit(0);
798         }
799         close(state->fd[1]);
800         set_close_on_exec(state->fd[0]);
801
802         state->fd[1] = -1;
803
804         talloc_set_destructor(state, set_recmode_destructor);
805
806         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
807
808         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
809                                     ctdb_set_recmode_timeout, state);
810
811         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
812                                 EVENT_FD_READ,
813                                 set_recmode_handler,
814                                 (void *)state);
815
816         if (state->fde == NULL) {
817                 talloc_free(state);
818                 return -1;
819         }
820         tevent_fd_set_auto_close(state->fde);
821
822         state->ctdb    = ctdb;
823         state->recmode = recmode;
824         state->c       = talloc_steal(state, c);
825
826         *async_reply = true;
827
828         return 0;
829 }
830
831
832 /*
833   try and get the recovery lock in shared storage - should only work
834   on the recovery master recovery daemon. Anywhere else is a bug
835  */
836 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
837 {
838         struct flock lock;
839
840         if (keep) {
841                 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
842         }
843         if (ctdb->recovery_lock_fd != -1) {
844                 close(ctdb->recovery_lock_fd);
845                 ctdb->recovery_lock_fd = -1;
846         }
847
848         ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
849         if (ctdb->recovery_lock_fd == -1) {
850                 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n", 
851                          ctdb->recovery_lock_file, strerror(errno)));
852                 return false;
853         }
854
855         set_close_on_exec(ctdb->recovery_lock_fd);
856
857         lock.l_type = F_WRLCK;
858         lock.l_whence = SEEK_SET;
859         lock.l_start = 0;
860         lock.l_len = 1;
861         lock.l_pid = 0;
862
863         if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
864                 close(ctdb->recovery_lock_fd);
865                 ctdb->recovery_lock_fd = -1;
866                 if (keep) {
867                         DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
868                 }
869                 return false;
870         }
871
872         if (!keep) {
873                 close(ctdb->recovery_lock_fd);
874                 ctdb->recovery_lock_fd = -1;
875         }
876
877         if (keep) {
878                 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
879         }
880
881         DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
882
883         return true;
884 }
885
886 /*
887   delete a record as part of the vacuum process
888   only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
889   use non-blocking locks
890
891   return 0 if the record was successfully deleted (i.e. it does not exist
892   when the function returns)
893   or !0 is the record still exists in the tdb after returning.
894  */
895 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
896 {
897         TDB_DATA key, data;
898         struct ctdb_ltdb_header *hdr, *hdr2;
899         
900         /* these are really internal tdb functions - but we need them here for
901            non-blocking lock of the freelist */
902         int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
903         int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
904
905
906         key.dsize = rec->keylen;
907         key.dptr  = &rec->data[0];
908         data.dsize = rec->datalen;
909         data.dptr = &rec->data[rec->keylen];
910
911         if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
912                 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
913                 return -1;
914         }
915
916         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
917                 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
918                 return -1;
919         }
920
921         hdr = (struct ctdb_ltdb_header *)data.dptr;
922
923         /* use a non-blocking lock */
924         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
925                 return -1;
926         }
927
928         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
929         if (data.dptr == NULL) {
930                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
931                 return 0;
932         }
933
934         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
935                 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
936                         tdb_delete(ctdb_db->ltdb->tdb, key);
937                         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
938                         DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
939                 }
940                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
941                 free(data.dptr);
942                 return 0;
943         }
944         
945         hdr2 = (struct ctdb_ltdb_header *)data.dptr;
946
947         if (hdr2->rsn > hdr->rsn) {
948                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
949                 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
950                          (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
951                 free(data.dptr);
952                 return -1;              
953         }
954
955         if (hdr2->dmaster == ctdb->pnn) {
956                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
957                 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
958                 free(data.dptr);
959                 return -1;                              
960         }
961
962         if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
963                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
964                 free(data.dptr);
965                 return -1;                              
966         }
967
968         if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
969                 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
970                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
971                 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
972                 free(data.dptr);
973                 return -1;                                              
974         }
975
976         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
977         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
978         free(data.dptr);
979         return 0;       
980 }
981
982
983
984 struct recovery_callback_state {
985         struct ctdb_req_control *c;
986 };
987
988
989 /*
990   called when the 'recovered' event script has finished
991  */
992 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
993 {
994         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
995
996         ctdb_enable_monitoring(ctdb);
997         CTDB_INCREMENT_STAT(ctdb, num_recoveries);
998
999         if (status != 0) {
1000                 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1001                 if (status == -ETIME) {
1002                         ctdb_ban_self(ctdb);
1003                 }
1004         }
1005
1006         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1007         talloc_free(state);
1008
1009         gettimeofday(&ctdb->last_recovery_finished, NULL);
1010 }
1011
1012 /*
1013   recovery has finished
1014  */
1015 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb, 
1016                                 struct ctdb_req_control *c,
1017                                 bool *async_reply)
1018 {
1019         int ret;
1020         struct recovery_callback_state *state;
1021
1022         DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1023
1024         ctdb_persistent_finish_trans3_commits(ctdb);
1025
1026         state = talloc(ctdb, struct recovery_callback_state);
1027         CTDB_NO_MEMORY(ctdb, state);
1028
1029         state->c    = c;
1030
1031         ctdb_disable_monitoring(ctdb);
1032
1033         ret = ctdb_event_script_callback(ctdb, state,
1034                                          ctdb_end_recovery_callback, 
1035                                          state, 
1036                                          false,
1037                                          CTDB_EVENT_RECOVERED, "%s", "");
1038
1039         if (ret != 0) {
1040                 ctdb_enable_monitoring(ctdb);
1041
1042                 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1043                 talloc_free(state);
1044                 return -1;
1045         }
1046
1047         /* tell the control that we will be reply asynchronously */
1048         state->c    = talloc_steal(state, c);
1049         *async_reply = true;
1050         return 0;
1051 }
1052
1053 /*
1054   called when the 'startrecovery' event script has finished
1055  */
1056 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1057 {
1058         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1059
1060         if (status != 0) {
1061                 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1062         }
1063
1064         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1065         talloc_free(state);
1066 }
1067
1068 /*
1069   run the startrecovery eventscript
1070  */
1071 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb, 
1072                                 struct ctdb_req_control *c,
1073                                 bool *async_reply)
1074 {
1075         int ret;
1076         struct recovery_callback_state *state;
1077
1078         DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1079         gettimeofday(&ctdb->last_recovery_started, NULL);
1080
1081         state = talloc(ctdb, struct recovery_callback_state);
1082         CTDB_NO_MEMORY(ctdb, state);
1083
1084         state->c    = talloc_steal(state, c);
1085
1086         ctdb_disable_monitoring(ctdb);
1087
1088         ret = ctdb_event_script_callback(ctdb, state,
1089                                          ctdb_start_recovery_callback, 
1090                                          state, false,
1091                                          CTDB_EVENT_START_RECOVERY,
1092                                          "%s", "");
1093
1094         if (ret != 0) {
1095                 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1096                 talloc_free(state);
1097                 return -1;
1098         }
1099
1100         /* tell the control that we will be reply asynchronously */
1101         *async_reply = true;
1102         return 0;
1103 }
1104
1105 /*
1106  try to delete all these records as part of the vacuuming process
1107  and return the records we failed to delete
1108 */
1109 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1110 {
1111         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1112         struct ctdb_db_context *ctdb_db;
1113         int i;
1114         struct ctdb_rec_data *rec;
1115         struct ctdb_marshall_buffer *records;
1116
1117         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1118                 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1119                 return -1;
1120         }
1121
1122         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1123         if (!ctdb_db) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1125                 return -1;
1126         }
1127
1128
1129         DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1130                  reply->count, reply->db_id));
1131
1132
1133         /* create a blob to send back the records we couldnt delete */  
1134         records = (struct ctdb_marshall_buffer *)
1135                         talloc_zero_size(outdata, 
1136                                     offsetof(struct ctdb_marshall_buffer, data));
1137         if (records == NULL) {
1138                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1139                 return -1;
1140         }
1141         records->db_id = ctdb_db->db_id;
1142
1143
1144         rec = (struct ctdb_rec_data *)&reply->data[0];
1145         for (i=0;i<reply->count;i++) {
1146                 TDB_DATA key, data;
1147
1148                 key.dptr = &rec->data[0];
1149                 key.dsize = rec->keylen;
1150                 data.dptr = &rec->data[key.dsize];
1151                 data.dsize = rec->datalen;
1152
1153                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1154                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1155                         return -1;
1156                 }
1157
1158                 /* If we cant delete the record we must add it to the reply
1159                    so the lmaster knows it may not purge this record
1160                 */
1161                 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1162                         size_t old_size;
1163                         struct ctdb_ltdb_header *hdr;
1164
1165                         hdr = (struct ctdb_ltdb_header *)data.dptr;
1166                         data.dptr += sizeof(*hdr);
1167                         data.dsize -= sizeof(*hdr);
1168
1169                         DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1170
1171                         old_size = talloc_get_size(records);
1172                         records = talloc_realloc_size(outdata, records, old_size + rec->length);
1173                         if (records == NULL) {
1174                                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1175                                 return -1;
1176                         }
1177                         records->count++;
1178                         memcpy(old_size+(uint8_t *)records, rec, rec->length);
1179                 } 
1180
1181                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1182         }           
1183
1184
1185         outdata->dptr = (uint8_t *)records;
1186         outdata->dsize = talloc_get_size(records);
1187
1188         return 0;
1189 }
1190
1191 /*
1192   report capabilities
1193  */
1194 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1195 {
1196         uint32_t *capabilities = NULL;
1197
1198         capabilities = talloc(outdata, uint32_t);
1199         CTDB_NO_MEMORY(ctdb, capabilities);
1200         *capabilities = ctdb->capabilities;
1201
1202         outdata->dsize = sizeof(uint32_t);
1203         outdata->dptr = (uint8_t *)capabilities;
1204
1205         return 0;       
1206 }
1207
1208 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1209 {
1210         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1211         uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1212
1213         DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1214
1215         if (*count < ctdb->tunable.recd_ping_failcount) {
1216                 (*count)++;
1217                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1218                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1219                         ctdb_recd_ping_timeout, ctdb);
1220                 return;
1221         }
1222
1223         DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1224
1225         ctdb_stop_recoverd(ctdb);
1226         ctdb_start_recoverd(ctdb);
1227 }
1228
1229 /* The recovery daemon will ping us at regular intervals.
1230    If we havent been pinged for a while we assume the recovery
1231    daemon is inoperable and we shut down.
1232 */
1233 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1234 {
1235         talloc_free(ctdb->recd_ping_count);
1236
1237         ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1238         CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1239
1240         if (ctdb->tunable.recd_ping_timeout != 0) {
1241                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1242                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1243                         ctdb_recd_ping_timeout, ctdb);
1244         }
1245
1246         return 0;
1247 }
1248
1249
1250
1251 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1252 {
1253         CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1254
1255         ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
1256         return 0;
1257 }
1258
1259
1260 struct stop_node_callback_state {
1261         struct ctdb_req_control *c;
1262 };
1263
1264 /*
1265   called when the 'stopped' event script has finished
1266  */
1267 static void ctdb_stop_node_callback(struct ctdb_context *ctdb, int status, void *p)
1268 {
1269         struct stop_node_callback_state *state = talloc_get_type(p, struct stop_node_callback_state);
1270
1271         if (status != 0) {
1272                 DEBUG(DEBUG_ERR,(__location__ " stopped event script failed (status %d)\n", status));
1273                 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1274                 if (status == -ETIME) {
1275                         ctdb_ban_self(ctdb);
1276                 }
1277         }
1278
1279         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1280         talloc_free(state);
1281 }
1282
1283 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply)
1284 {
1285         int ret;
1286         struct stop_node_callback_state *state;
1287
1288         DEBUG(DEBUG_INFO,(__location__ " Stopping node\n"));
1289
1290         state = talloc(ctdb, struct stop_node_callback_state);
1291         CTDB_NO_MEMORY(ctdb, state);
1292
1293         state->c    = talloc_steal(state, c);
1294
1295         ctdb_disable_monitoring(ctdb);
1296
1297         ret = ctdb_event_script_callback(ctdb, state,
1298                                          ctdb_stop_node_callback, 
1299                                          state, false,
1300                                          CTDB_EVENT_STOPPED, "%s", "");
1301
1302         if (ret != 0) {
1303                 ctdb_enable_monitoring(ctdb);
1304
1305                 DEBUG(DEBUG_ERR,(__location__ " Failed to stop node\n"));
1306                 talloc_free(state);
1307                 return -1;
1308         }
1309
1310         ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1311
1312         *async_reply = true;
1313
1314         return 0;
1315 }
1316
1317 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1318 {
1319         DEBUG(DEBUG_INFO,(__location__ " Continue node\n"));
1320         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1321
1322         return 0;
1323 }