Make the correct module name.
[gd/samba-autobuild/.git] / source4 / cluster / ctdb / server / ctdb_recover.c
1 /* 
2    ctdb recovery code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30   lock all databases - mark only
31  */
32 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
33 {
34         struct ctdb_db_context *ctdb_db;
35         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
36                 DEBUG(0,("Attempt to mark all databases locked when not frozen\n"));
37                 return -1;
38         }
39         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
40                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
41                         return -1;
42                 }
43         }
44         return 0;
45 }
46
47 /*
48   lock all databases - unmark only
49  */
50 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
51 {
52         struct ctdb_db_context *ctdb_db;
53         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
54                 DEBUG(0,("Attempt to unmark all databases locked when not frozen\n"));
55                 return -1;
56         }
57         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
58                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
59                         return -1;
60                 }
61         }
62         return 0;
63 }
64
65
66 int 
67 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
68 {
69         CHECK_CONTROL_DATA_SIZE(0);
70         struct ctdb_vnn_map_wire *map;
71         size_t len;
72
73         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
74         map = talloc_size(outdata, len);
75         CTDB_NO_MEMORY_VOID(ctdb, map);
76
77         map->generation = ctdb->vnn_map->generation;
78         map->size = ctdb->vnn_map->size;
79         memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
80
81         outdata->dsize = len;
82         outdata->dptr  = (uint8_t *)map;
83
84         return 0;
85 }
86
87 int 
88 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
89 {
90         struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
91
92         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
93                 DEBUG(0,("Attempt to set vnnmap when not frozen\n"));
94                 return -1;
95         }
96
97         talloc_free(ctdb->vnn_map);
98
99         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
100         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
101
102         ctdb->vnn_map->generation = map->generation;
103         ctdb->vnn_map->size       = map->size;
104         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
105         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
106
107         memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
108
109         return 0;
110 }
111
112 int 
113 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
114 {
115         uint32_t i, len;
116         struct ctdb_db_context *ctdb_db;
117         struct ctdb_dbid_map *dbid_map;
118
119         CHECK_CONTROL_DATA_SIZE(0);
120
121         len = 0;
122         for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
123                 len++;
124         }
125
126
127         outdata->dsize = offsetof(struct ctdb_dbid_map, dbids) + 4*len;
128         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
129         if (!outdata->dptr) {
130                 DEBUG(0, (__location__ " Failed to allocate dbmap array\n"));
131                 exit(1);
132         }
133
134         dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
135         dbid_map->num = len;
136         for(i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
137                 dbid_map->dbids[i] = ctdb_db->db_id;
138         }
139
140         return 0;
141 }
142
143 int 
144 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
145 {
146         uint32_t i, num_nodes;
147         struct ctdb_node_map *node_map;
148
149         CHECK_CONTROL_DATA_SIZE(0);
150
151         num_nodes = ctdb->num_nodes;
152
153         outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
154         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
155         if (!outdata->dptr) {
156                 DEBUG(0, (__location__ " Failed to allocate nodemap array\n"));
157                 exit(1);
158         }
159
160         node_map = (struct ctdb_node_map *)outdata->dptr;
161         node_map->num = num_nodes;
162         for (i=0; i<num_nodes; i++) {
163                 inet_aton(ctdb->nodes[i]->address.address, &node_map->nodes[i].sin.sin_addr);
164                 node_map->nodes[i].vnn   = ctdb->nodes[i]->vnn;
165                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
166         }
167
168         return 0;
169 }
170
171 struct getkeys_params {
172         struct ctdb_context *ctdb;
173         uint32_t lmaster;
174         uint32_t rec_count;
175         struct getkeys_rec {
176                 TDB_DATA key;
177                 TDB_DATA data;
178         } *recs;
179 };
180
181 static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
182 {
183         struct getkeys_params *params = (struct getkeys_params *)p;
184         uint32_t lmaster;
185
186         lmaster = ctdb_lmaster(params->ctdb, &key);
187
188         /* only include this record if the lmaster matches or if
189            the wildcard lmaster (-1) was specified.
190         */
191         if ((params->lmaster != CTDB_LMASTER_ANY) && (params->lmaster != lmaster)) {
192                 return 0;
193         }
194
195         params->recs = talloc_realloc(NULL, params->recs, struct getkeys_rec, params->rec_count+1);
196         key.dptr = talloc_memdup(params->recs, key.dptr, key.dsize);
197         data.dptr = talloc_memdup(params->recs, data.dptr, data.dsize);
198         params->recs[params->rec_count].key = key;
199         params->recs[params->rec_count].data = data;
200         params->rec_count++;
201
202         return 0;
203 }
204
205 /*
206   pul a bunch of records from a ltdb, filtering by lmaster
207  */
208 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
209 {
210         struct ctdb_control_pulldb *pull;
211         struct ctdb_db_context *ctdb_db;
212         struct getkeys_params params;
213         struct ctdb_control_pulldb_reply *reply;
214         int i;
215         size_t len = 0;
216
217         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
218                 DEBUG(0,("rejecting ctdb_control_pull_db when not frozen\n"));
219                 return -1;
220         }
221
222         pull = (struct ctdb_control_pulldb *)indata.dptr;
223         
224         ctdb_db = find_ctdb_db(ctdb, pull->db_id);
225         if (!ctdb_db) {
226                 DEBUG(0,(__location__ " Unknown db\n"));
227                 return -1;
228         }
229
230         params.ctdb = ctdb;
231         params.lmaster = pull->lmaster;
232
233         params.rec_count = 0;
234         params.recs = talloc_array(outdata, struct getkeys_rec, 0);
235         CTDB_NO_MEMORY(ctdb, params.recs);
236
237         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
238                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
239                 return -1;
240         }
241
242         tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_getkeys, &params);
243
244         ctdb_lock_all_databases_unmark(ctdb);
245
246         reply = talloc(outdata, struct ctdb_control_pulldb_reply);
247         CTDB_NO_MEMORY(ctdb, reply);
248
249         reply->db_id = pull->db_id;
250         reply->count = params.rec_count;
251
252         len = offsetof(struct ctdb_control_pulldb_reply, data);
253
254         for (i=0;i<reply->count;i++) {
255                 struct ctdb_rec_data *rec;
256                 rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, params.recs[i].data);
257                 reply = talloc_realloc_size(outdata, reply, rec->length + len);
258                 memcpy(len+(uint8_t *)reply, rec, rec->length);
259                 len += rec->length;
260                 talloc_free(rec);
261         }
262
263         talloc_free(params.recs);
264
265         outdata->dptr = (uint8_t *)reply;
266         outdata->dsize = len;
267
268         return 0;
269 }
270
271 /*
272   push a bunch of records into a ltdb, filtering by rsn
273  */
274 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
275 {
276         struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
277         struct ctdb_db_context *ctdb_db;
278         int i, ret;
279         struct ctdb_rec_data *rec;
280
281         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
282                 DEBUG(0,("rejecting ctdb_control_push_db when not frozen\n"));
283                 return -1;
284         }
285
286         if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
287                 DEBUG(0,(__location__ " invalid data in pulldb reply\n"));
288                 return -1;
289         }
290
291         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
292         if (!ctdb_db) {
293                 DEBUG(0,(__location__ " Unknown db 0x%08x\n", reply->db_id));
294                 return -1;
295         }
296
297         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
298                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
299                 return -1;
300         }
301
302         rec = (struct ctdb_rec_data *)&reply->data[0];
303
304         DEBUG(3,("starting push of %u records for dbid 0x%x\n",
305                  reply->count, reply->db_id));
306
307         for (i=0;i<reply->count;i++) {
308                 TDB_DATA key, data;
309                 struct ctdb_ltdb_header *hdr, header;
310
311                 key.dptr = &rec->data[0];
312                 key.dsize = rec->keylen;
313                 data.dptr = &rec->data[key.dsize];
314                 data.dsize = rec->datalen;
315
316                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
317                         DEBUG(0,(__location__ " bad ltdb record\n"));
318                         goto failed;
319                 }
320                 hdr = (struct ctdb_ltdb_header *)data.dptr;
321                 data.dptr += sizeof(*hdr);
322                 data.dsize -= sizeof(*hdr);
323
324                 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL);
325                 if (ret != 0) {
326                         DEBUG(0, (__location__ " Unable to fetch record\n"));
327                         goto failed;
328                 }
329                 /* The check for dmaster gives priority to the dmaster
330                    if the rsn values are equal */
331                 if (header.rsn < hdr->rsn ||
332                     (header.dmaster != ctdb->vnn && header.rsn == hdr->rsn)) {
333                         ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
334                         if (ret != 0) {
335                                 DEBUG(0, (__location__ " Unable to store record\n"));
336                                 goto failed;
337                         }
338                 }
339
340                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
341         }           
342
343         DEBUG(3,("finished push of %u records for dbid 0x%x\n",
344                  reply->count, reply->db_id));
345
346         ctdb_lock_all_databases_unmark(ctdb);
347         return 0;
348
349 failed:
350         ctdb_lock_all_databases_unmark(ctdb);
351         return -1;
352 }
353
354
355 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
356 {
357         uint32_t *dmaster = (uint32_t *)p;
358         struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
359         int ret;
360
361         header->dmaster = *dmaster;
362
363         ret = tdb_store(tdb, key, data, TDB_REPLACE);
364         if (ret) {
365                 DEBUG(0,(__location__ " failed to write tdb data back  ret:%d\n",ret));
366                 return ret;
367         }
368         return 0;
369 }
370
371 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
372 {
373         struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
374         struct ctdb_db_context *ctdb_db;
375
376         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
377                 DEBUG(0,("rejecting ctdb_control_set_dmaster when not frozen\n"));
378                 return -1;
379         }
380
381         ctdb_db = find_ctdb_db(ctdb, p->db_id);
382         if (!ctdb_db) {
383                 DEBUG(0,(__location__ " Unknown db 0x%08x\n", p->db_id));
384                 return -1;
385         }
386
387         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
388                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
389                 return -1;
390         }
391
392         tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
393
394         ctdb_lock_all_databases_unmark(ctdb);
395         
396         return 0;
397 }
398
399 struct ctdb_set_recmode_state {
400         struct ctdb_req_control *c;
401         uint32_t recmode;
402 };
403
404 /*
405   called when the 'recovered' event script has finished
406  */
407 static void ctdb_recovered_callback(struct ctdb_context *ctdb, int status, void *p)
408 {
409         struct ctdb_set_recmode_state *state = talloc_get_type(p, struct ctdb_set_recmode_state);
410
411         ctdb_start_monitoring(ctdb);
412
413         if (status == 0) {
414                 ctdb->recovery_mode = state->recmode;
415         } else {
416                 DEBUG(0,(__location__ " recovered event script failed (status %d)\n", status));
417         }
418
419         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
420         talloc_free(state);
421 }
422
423 /*
424   set the recovery mode
425  */
426 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
427                                  struct ctdb_req_control *c,
428                                  TDB_DATA indata, bool *async_reply,
429                                  const char **errormsg)
430 {
431         uint32_t recmode = *(uint32_t *)indata.dptr;
432         int ret;
433         struct ctdb_set_recmode_state *state;
434
435         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
436                 DEBUG(0,("Attempt to change recovery mode to %u when not frozen\n", 
437                          recmode));
438                 (*errormsg) = "Cannot change recovery mode while not frozen";
439                 return -1;
440         }
441
442         if (recmode != CTDB_RECOVERY_NORMAL ||
443             ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
444                 ctdb->recovery_mode = recmode;
445                 return 0;
446         }
447
448         /* some special handling when ending recovery mode */
449         state = talloc(ctdb, struct ctdb_set_recmode_state);
450         CTDB_NO_MEMORY(ctdb, state);
451
452         /* we should not be able to get the lock on the nodes list, as it should be
453            held by the recovery master */
454         if (ctdb_recovery_lock(ctdb, false)) {
455                 DEBUG(0,("ERROR: recovery lock file %s not locked when recovering!\n",
456                          ctdb->recovery_lock_file));
457                 return -1;
458         }       
459
460         state->c = talloc_steal(state, c);
461         state->recmode = recmode;
462         
463         ctdb_stop_monitoring(ctdb);
464
465         /* call the events script to tell all subsystems that we have recovered */
466         ret = ctdb_event_script_callback(ctdb, 
467                                          timeval_current_ofs(ctdb->tunable.script_timeout, 0),
468                                          state, 
469                                          ctdb_recovered_callback, 
470                                          state, "recovered");
471         if (ret != 0) {
472                 return ret;
473         }
474         *async_reply = true;
475
476         return 0;
477 }
478
479 /*
480   callback for ctdb_control_max_rsn
481  */
482 static int traverse_max_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
483 {
484         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
485         uint64_t *max_rsn = (uint64_t *)p;
486
487         if (data.dsize >= sizeof(*h)) {
488                 (*max_rsn) = MAX(*max_rsn, h->rsn);
489         }
490         return 0;
491 }
492
493 /*
494   get max rsn across an entire db
495  */
496 int32_t ctdb_control_max_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
497 {
498         struct ctdb_db_context *ctdb_db;
499         uint32_t db_id = *(uint32_t *)indata.dptr;
500         uint64_t max_rsn = 0;
501         int ret;
502
503         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
504                 DEBUG(0,("rejecting ctdb_control_max_rsn when not frozen\n"));
505                 return -1;
506         }
507
508         ctdb_db = find_ctdb_db(ctdb, db_id);
509         if (!ctdb_db) {
510                 DEBUG(0,(__location__ " Unknown db\n"));
511                 return -1;
512         }
513
514         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
515                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
516                 return -1;
517         }
518
519         ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_max_rsn, &max_rsn);
520         if (ret < 0) {
521                 DEBUG(0,(__location__ " traverse failed in ctdb_control_max_rsn\n"));
522                 return -1;
523         }
524
525         ctdb_lock_all_databases_unmark(ctdb);
526
527         outdata->dptr = (uint8_t *)talloc(outdata, uint64_t);
528         if (!outdata->dptr) {
529                 return -1;
530         }
531         (*(uint64_t *)outdata->dptr) = max_rsn;
532         outdata->dsize = sizeof(uint64_t);
533
534         return 0;
535 }
536
537
538 /*
539   callback for ctdb_control_set_rsn_nonempty
540  */
541 static int traverse_set_rsn_nonempty(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
542 {
543         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
544         uint64_t *rsn = (uint64_t *)p;
545
546         if (data.dsize > sizeof(*h)) {
547                 h->rsn = *rsn;
548                 if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
549                         return -1;
550                 }
551         }
552         return 0;
553 }
554
555 /*
556   set rsn for all non-empty records in a database to a given rsn
557  */
558 int32_t ctdb_control_set_rsn_nonempty(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
559 {
560         struct ctdb_control_set_rsn_nonempty *p = (struct ctdb_control_set_rsn_nonempty *)indata.dptr;
561         struct ctdb_db_context *ctdb_db;
562         int ret;
563
564         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
565                 DEBUG(0,("rejecting ctdb_control_set_rsn_nonempty when not frozen\n"));
566                 return -1;
567         }
568
569         ctdb_db = find_ctdb_db(ctdb, p->db_id);
570         if (!ctdb_db) {
571                 DEBUG(0,(__location__ " Unknown db\n"));
572                 return -1;
573         }
574
575         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
576                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
577                 return -1;
578         }
579
580         ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_set_rsn_nonempty, &p->rsn);
581         if (ret < 0) {
582                 DEBUG(0,(__location__ " traverse failed in ctdb_control_set_rsn_nonempty\n"));
583                 return -1;
584         }
585
586         ctdb_lock_all_databases_unmark(ctdb);
587
588         return 0;
589 }
590
591
592 /*
593   callback for ctdb_control_delete_low_rsn
594  */
595 static int traverse_delete_low_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
596 {
597         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
598         uint64_t *rsn = (uint64_t *)p;
599
600         if (data.dsize < sizeof(*h) || h->rsn < *rsn) {
601                 if (tdb_delete(tdb, key) != 0) {
602                         return -1;
603                 }
604         }
605         return 0;
606 }
607
608 /*
609   delete any records with a rsn < the given rsn
610  */
611 int32_t ctdb_control_delete_low_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
612 {
613         struct ctdb_control_delete_low_rsn *p = (struct ctdb_control_delete_low_rsn *)indata.dptr;
614         struct ctdb_db_context *ctdb_db;
615         int ret;
616
617         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
618                 DEBUG(0,("rejecting ctdb_control_delete_low_rsn when not frozen\n"));
619                 return -1;
620         }
621
622         ctdb_db = find_ctdb_db(ctdb, p->db_id);
623         if (!ctdb_db) {
624                 DEBUG(0,(__location__ " Unknown db\n"));
625                 return -1;
626         }
627
628         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
629                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
630                 return -1;
631         }
632
633         ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_delete_low_rsn, &p->rsn);
634         if (ret < 0) {
635                 DEBUG(0,(__location__ " traverse failed in ctdb_control_delete_low_rsn\n"));
636                 return -1;
637         }
638
639         ctdb_lock_all_databases_unmark(ctdb);
640
641         return 0;
642 }
643
644
645 /*
646   try and get the recovery lock in shared storage - should only work
647   on the recovery master recovery daemon. Anywhere else is a bug
648  */
649 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
650 {
651         struct flock lock;
652
653         if (ctdb->recovery_lock_fd != -1) {
654                 close(ctdb->recovery_lock_fd);
655         }
656         ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
657         if (ctdb->recovery_lock_fd == -1) {
658                 DEBUG(0,("Unable to open %s - (%s)\n", 
659                          ctdb->recovery_lock_file, strerror(errno)));
660                 return false;
661         }
662
663         lock.l_type = F_WRLCK;
664         lock.l_whence = SEEK_SET;
665         lock.l_start = 0;
666         lock.l_len = 1;
667         lock.l_pid = 0;
668
669         if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
670                 return false;
671         }
672
673         if (!keep) {
674                 close(ctdb->recovery_lock_fd);
675                 ctdb->recovery_lock_fd = -1;
676         }
677
678         return true;
679 }