ctdb-locking: Get tdb open flags from tdb instead of re-calculating
[samba.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/network.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
30 #include "lib/util/samba_util.h"
31 #include "lib/util/sys_rw.h"
32
33 #include "ctdb_private.h"
34
35 #include "common/common.h"
36 #include "common/logging.h"
37
38 /*
39  * Non-blocking Locking API
40  *
41  * 1. Create a child process to do blocking locks.
42  * 2. Once the locks are obtained, signal parent process via fd.
43  * 3. Invoke registered callback routine with locking status.
44  * 4. If the child process cannot get locks within certain time,
45  *    execute an external script to debug.
46  *
47  * ctdb_lock_record()      - get a lock on a record
48  * ctdb_lock_db()          - get a lock on a DB
49  *
50  *  auto_mark              - whether to mark/unmark DBs in before/after callback
51  *                           = false is used for freezing databases for
52  *                           recovery since the recovery cannot start till
53  *                           databases are locked on all the nodes.
54  *                           = true is used for record locks.
55  */
56
57 enum lock_type {
58         LOCK_RECORD,
59         LOCK_DB,
60 };
61
62 static const char * const lock_type_str[] = {
63         "lock_record",
64         "lock_db",
65 };
66
67 struct lock_request;
68
69 /* lock_context is the common part for a lock request */
70 struct lock_context {
71         struct lock_context *next, *prev;
72         enum lock_type type;
73         struct ctdb_context *ctdb;
74         struct ctdb_db_context *ctdb_db;
75         TDB_DATA key;
76         uint32_t priority;
77         bool auto_mark;
78         struct lock_request *request;
79         pid_t child;
80         int fd[2];
81         struct tevent_fd *tfd;
82         struct tevent_timer *ttimer;
83         struct timeval start_time;
84         uint32_t key_hash;
85         bool can_schedule;
86 };
87
88 /* lock_request is the client specific part for a lock request */
89 struct lock_request {
90         struct lock_context *lctx;
91         void (*callback)(void *, bool);
92         void *private_data;
93 };
94
95
96 int ctdb_db_iterator(struct ctdb_context *ctdb, ctdb_db_handler_t handler,
97                      void *private_data)
98 {
99         struct ctdb_db_context *ctdb_db;
100         int ret;
101
102         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
103                 ret = handler(ctdb_db, private_data);
104                 if (ret != 0) {
105                         return -1;
106                 }
107         }
108
109         return 0;
110 }
111
112 /*
113  * lock all databases - mark only
114  */
115 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db,
116                                 void *private_data)
117 {
118         int tdb_transaction_write_lock_mark(struct tdb_context *);
119
120         DEBUG(DEBUG_INFO, ("marking locked database %s\n", ctdb_db->db_name));
121
122         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
123                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
124                                   ctdb_db->db_name));
125                 return -1;
126         }
127
128         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
129                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
130                                   ctdb_db->db_name));
131                 return -1;
132         }
133
134         return 0;
135 }
136
137 int ctdb_lockdb_mark(struct ctdb_db_context *ctdb_db)
138 {
139         if (!ctdb_db_frozen(ctdb_db)) {
140                 DEBUG(DEBUG_ERR,
141                       ("Attempt to mark database locked when not frozen\n"));
142                 return -1;
143         }
144
145         return db_lock_mark_handler(ctdb_db, NULL);
146 }
147
148 /*
149  * lock all databases - unmark only
150  */
151 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db,
152                                   void *private_data)
153 {
154         int tdb_transaction_write_lock_unmark(struct tdb_context *);
155
156         DEBUG(DEBUG_INFO, ("unmarking locked database %s\n", ctdb_db->db_name));
157
158         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
159                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
160                                   ctdb_db->db_name));
161                 return -1;
162         }
163
164         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
165                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
166                                   ctdb_db->db_name));
167                 return -1;
168         }
169
170         return 0;
171 }
172
173 int ctdb_lockdb_unmark(struct ctdb_db_context *ctdb_db)
174 {
175         if (!ctdb_db_frozen(ctdb_db)) {
176                 DEBUG(DEBUG_ERR,
177                       ("Attempt to unmark database locked when not frozen\n"));
178                 return -1;
179         }
180
181         return db_lock_unmark_handler(ctdb_db, NULL);
182 }
183
184 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
185
186 /*
187  * Destructor to kill the child locking process
188  */
189 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
190 {
191         if (lock_ctx->request) {
192                 lock_ctx->request->lctx = NULL;
193         }
194         if (lock_ctx->child > 0) {
195                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGTERM);
196                 if (lock_ctx->type == LOCK_RECORD) {
197                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
198                 } else {
199                         DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
200                 }
201                 if (lock_ctx->ctdb_db) {
202                         lock_ctx->ctdb_db->lock_num_current--;
203                 }
204                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
205                 if (lock_ctx->ctdb_db) {
206                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
207                 }
208         } else {
209                 if (lock_ctx->type == LOCK_RECORD) {
210                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
211                 } else {
212                         DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
213                 }
214                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
215                 if (lock_ctx->ctdb_db) {
216                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
217                 }
218         }
219
220         ctdb_lock_schedule(lock_ctx->ctdb);
221
222         return 0;
223 }
224
225
226 /*
227  * Destructor to remove lock request
228  */
229 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
230 {
231         if (lock_request->lctx == NULL) {
232                 return 0;
233         }
234
235         lock_request->lctx->request = NULL;
236         TALLOC_FREE(lock_request->lctx);
237
238         return 0;
239 }
240
241 /*
242  * Process all the callbacks waiting for lock
243  *
244  * If lock has failed, callback is executed with locked=false
245  */
246 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
247 {
248         struct lock_request *request;
249         bool auto_mark = lock_ctx->auto_mark;
250
251         if (auto_mark && locked) {
252                 switch (lock_ctx->type) {
253                 case LOCK_RECORD:
254                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
255                         break;
256
257                 case LOCK_DB:
258                         ctdb_lockdb_mark(lock_ctx->ctdb_db);
259                         break;
260                 }
261         }
262
263         request = lock_ctx->request;
264         if (auto_mark) {
265                 /* Since request may be freed in the callback, unset the lock
266                  * context, so request destructor will not free lock context.
267                  */
268                 request->lctx = NULL;
269         }
270
271         /* Since request may be freed in the callback, unset the request */
272         lock_ctx->request = NULL;
273
274         request->callback(request->private_data, locked);
275
276         if (!auto_mark) {
277                 return;
278         }
279
280         if (locked) {
281                 switch (lock_ctx->type) {
282                 case LOCK_RECORD:
283                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
284                         break;
285
286                 case LOCK_DB:
287                         ctdb_lockdb_unmark(lock_ctx->ctdb_db);
288                         break;
289                 }
290         }
291
292         talloc_free(lock_ctx);
293 }
294
295
296 static int lock_bucket_id(double t)
297 {
298         double ms = 1.e-3, s = 1;
299         int id;
300
301         if (t < 1*ms) {
302                 id = 0;
303         } else if (t < 10*ms) {
304                 id = 1;
305         } else if (t < 100*ms) {
306                 id = 2;
307         } else if (t < 1*s) {
308                 id = 3;
309         } else if (t < 2*s) {
310                 id = 4;
311         } else if (t < 4*s) {
312                 id = 5;
313         } else if (t < 8*s) {
314                 id = 6;
315         } else if (t < 16*s) {
316                 id = 7;
317         } else if (t < 32*s) {
318                 id = 8;
319         } else if (t < 64*s) {
320                 id = 9;
321         } else {
322                 id = 10;
323         }
324
325         return id;
326 }
327
328 /*
329  * Callback routine when the required locks are obtained.
330  * Called from parent context
331  */
332 static void ctdb_lock_handler(struct tevent_context *ev,
333                             struct tevent_fd *tfd,
334                             uint16_t flags,
335                             void *private_data)
336 {
337         struct lock_context *lock_ctx;
338         char c;
339         bool locked;
340         double t;
341         int id;
342
343         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
344
345         /* cancel the timeout event */
346         TALLOC_FREE(lock_ctx->ttimer);
347
348         t = timeval_elapsed(&lock_ctx->start_time);
349         id = lock_bucket_id(t);
350
351         /* Read the status from the child process */
352         if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
353                 locked = false;
354         } else {
355                 locked = (c == 0 ? true : false);
356         }
357
358         /* Update statistics */
359         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
360         if (lock_ctx->ctdb_db) {
361                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
362         }
363
364         if (locked) {
365                 if (lock_ctx->ctdb_db) {
366                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
367                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
368                                             lock_type_str[lock_ctx->type], locks.latency,
369                                             lock_ctx->start_time);
370
371                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
372                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
373                 }
374         } else {
375                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
376                 if (lock_ctx->ctdb_db) {
377                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
378                 }
379         }
380
381         process_callbacks(lock_ctx, locked);
382 }
383
384 struct lock_log_entry {
385         struct db_hash_context *lock_log;
386         TDB_DATA key;
387         unsigned long log_sec;
388         struct tevent_timer *timer;
389 };
390
391 static int lock_log_fetch_parser(uint8_t *keybuf, size_t keylen,
392                                  uint8_t *databuf, size_t datalen,
393                                  void *private_data)
394 {
395         struct lock_log_entry **entry =
396                 (struct lock_log_entry **)private_data;
397
398         if (datalen != sizeof(struct lock_log_entry *)) {
399                 return EINVAL;
400         }
401
402         *entry = talloc_get_type_abort(*(void **)databuf,
403                                        struct lock_log_entry);
404         return 0;
405 }
406
407 static void lock_log_cleanup(struct tevent_context *ev,
408                              struct tevent_timer *ttimer,
409                              struct timeval current_time,
410                              void *private_data)
411 {
412         struct lock_log_entry *entry = talloc_get_type_abort(
413                 private_data, struct lock_log_entry);
414         int ret;
415
416         entry->timer = NULL;
417
418         ret = db_hash_delete(entry->lock_log, entry->key.dptr,
419                              entry->key.dsize);
420         if (ret != 0) {
421                 return;
422         }
423         talloc_free(entry);
424 }
425
426 static bool lock_log_skip(struct tevent_context *ev,
427                           struct db_hash_context *lock_log,
428                           TDB_DATA key, unsigned long elapsed_sec)
429 {
430         struct lock_log_entry *entry = NULL;
431         int ret;
432
433         ret = db_hash_fetch(lock_log, key.dptr, key.dsize,
434                             lock_log_fetch_parser, &entry);
435         if (ret == ENOENT) {
436
437                 entry = talloc_zero(lock_log, struct lock_log_entry);
438                 if (entry == NULL) {
439                         goto fail;
440                 }
441
442                 entry->lock_log = lock_log;
443
444                 entry->key.dptr = talloc_memdup(entry, key.dptr, key.dsize);
445                 if (entry->key.dptr == NULL) {
446                         talloc_free(entry);
447                         goto fail;
448                 }
449                 entry->key.dsize = key.dsize;
450
451                 entry->log_sec = elapsed_sec;
452                 entry->timer = tevent_add_timer(ev, entry,
453                                                 timeval_current_ofs(30, 0),
454                                                 lock_log_cleanup, entry);
455                 if (entry->timer == NULL) {
456                         talloc_free(entry);
457                         goto fail;
458                 }
459
460                 ret = db_hash_add(lock_log, key.dptr, key.dsize,
461                                   (uint8_t *)&entry,
462                                   sizeof(struct lock_log_entry *));
463                 if (ret != 0) {
464                         talloc_free(entry);
465                         goto fail;
466                 }
467
468                 return false;
469
470         } else if (ret == EINVAL) {
471
472                 ret = db_hash_delete(lock_log, key.dptr, key.dsize);
473                 if (ret != 0) {
474                         goto fail;
475                 }
476
477                 return false;
478
479         } else if (ret == 0) {
480
481                 if (elapsed_sec <= entry->log_sec) {
482                         return true;
483                 }
484
485                 entry->log_sec = elapsed_sec;
486
487                 TALLOC_FREE(entry->timer);
488                 entry->timer = tevent_add_timer(ev, entry,
489                                                 timeval_current_ofs(30, 0),
490                                                 lock_log_cleanup, entry);
491                 if (entry->timer == NULL) {
492                         ret = db_hash_delete(lock_log, key.dptr, key.dsize);
493                         if (ret != 0) {
494                                 goto fail;
495                         }
496                         talloc_free(entry);
497                 }
498
499                 return false;
500         }
501
502
503 fail:
504         return false;
505
506 }
507
508 /*
509  * Callback routine when required locks are not obtained within timeout
510  * Called from parent context
511  */
512 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
513                                     struct tevent_timer *ttimer,
514                                     struct timeval current_time,
515                                     void *private_data)
516 {
517         static char debug_locks[PATH_MAX+1] = "";
518         struct lock_context *lock_ctx;
519         struct ctdb_context *ctdb;
520         pid_t pid;
521         double elapsed_time;
522         bool skip;
523         char *keystr;
524
525         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
526         ctdb = lock_ctx->ctdb;
527
528         elapsed_time = timeval_elapsed(&lock_ctx->start_time);
529
530         /* For database locks, always log */
531         if (lock_ctx->type == LOCK_DB) {
532                 DEBUG(DEBUG_WARNING,
533                       ("Unable to get DB lock on database %s for "
534                        "%.0lf seconds\n",
535                        lock_ctx->ctdb_db->db_name, elapsed_time));
536                 goto lock_debug;
537         }
538
539         /* For record locks, check if we have already logged */
540         skip = lock_log_skip(ev, lock_ctx->ctdb_db->lock_log,
541                              lock_ctx->key, (unsigned long)elapsed_time);
542         if (skip) {
543                 goto skip_lock_debug;
544         }
545
546         keystr = hex_encode_talloc(lock_ctx, lock_ctx->key.dptr,
547                                    lock_ctx->key.dsize);
548         DEBUG(DEBUG_WARNING,
549               ("Unable to get RECORD lock on database %s for %.0lf seconds"
550                " (key %s)\n",
551                lock_ctx->ctdb_db->db_name, elapsed_time,
552                keystr ? keystr : ""));
553         TALLOC_FREE(keystr);
554
555         /* If a node stopped/banned, don't spam the logs */
556         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
557                 goto skip_lock_debug;
558         }
559
560 lock_debug:
561
562         if (ctdb_set_helper("lock debugging helper",
563                             debug_locks, sizeof(debug_locks),
564                             "CTDB_DEBUG_LOCKS",
565                             getenv("CTDB_BASE"), "debug_locks.sh")) {
566                 pid = vfork();
567                 if (pid == 0) {
568                         execl(debug_locks, debug_locks, NULL);
569                         _exit(0);
570                 }
571                 ctdb_track_child(ctdb, pid);
572         } else {
573                 DEBUG(DEBUG_WARNING,
574                       (__location__
575                        " Unable to setup lock debugging\n"));
576         }
577
578 skip_lock_debug:
579
580         /* reset the timeout timer */
581         // talloc_free(lock_ctx->ttimer);
582         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
583                                             lock_ctx,
584                                             timeval_current_ofs(10, 0),
585                                             ctdb_lock_timeout_handler,
586                                             (void *)lock_ctx);
587 }
588
589 static bool lock_helper_args(TALLOC_CTX *mem_ctx,
590                              struct lock_context *lock_ctx, int fd,
591                              int *argc, const char ***argv)
592 {
593         const char **args = NULL;
594         int nargs = 0, i;
595
596         switch (lock_ctx->type) {
597         case LOCK_RECORD:
598                 nargs = 6;
599                 break;
600
601         case LOCK_DB:
602                 nargs = 5;
603                 break;
604         }
605
606         /* Add extra argument for null termination */
607         nargs++;
608
609         args = talloc_array(mem_ctx, const char *, nargs);
610         if (args == NULL) {
611                 return false;
612         }
613
614         args[0] = talloc_asprintf(args, "%d", getpid());
615         args[1] = talloc_asprintf(args, "%d", fd);
616
617         switch (lock_ctx->type) {
618         case LOCK_RECORD:
619                 args[2] = talloc_strdup(args, "RECORD");
620                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
621                 args[4] = talloc_asprintf(args, "0x%x",
622                                 tdb_get_flags(lock_ctx->ctdb_db->ltdb->tdb));
623                 if (lock_ctx->key.dsize == 0) {
624                         args[5] = talloc_strdup(args, "NULL");
625                 } else {
626                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
627                 }
628                 break;
629
630         case LOCK_DB:
631                 args[2] = talloc_strdup(args, "DB");
632                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
633                 args[4] = talloc_asprintf(args, "0x%x",
634                                 tdb_get_flags(lock_ctx->ctdb_db->ltdb->tdb));
635                 break;
636         }
637
638         /* Make sure last argument is NULL */
639         args[nargs-1] = NULL;
640
641         for (i=0; i<nargs-1; i++) {
642                 if (args[i] == NULL) {
643                         talloc_free(args);
644                         return false;
645                 }
646         }
647
648         *argc = nargs;
649         *argv = args;
650         return true;
651 }
652
653 /*
654  * Find a lock request that can be scheduled
655  */
656 static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
657 {
658         struct lock_context *lock_ctx, *next_ctx;
659         struct ctdb_db_context *ctdb_db;
660
661         /* First check if there are database lock requests */
662
663         for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
664              lock_ctx = next_ctx) {
665
666                 if (lock_ctx->request != NULL) {
667                         /* Found a lock context with a request */
668                         return lock_ctx;
669                 }
670
671                 next_ctx = lock_ctx->next;
672
673                 DEBUG(DEBUG_INFO, ("Removing lock context without lock "
674                                    "request\n"));
675                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
676                 CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
677                 if (lock_ctx->ctdb_db) {
678                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
679                                                locks.num_pending);
680                 }
681                 talloc_free(lock_ctx);
682         }
683
684         /* Next check database queues */
685         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
686                 if (ctdb_db->lock_num_current ==
687                     ctdb->tunable.lock_processes_per_db) {
688                         continue;
689                 }
690
691                 for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
692                      lock_ctx = next_ctx) {
693
694                         next_ctx = lock_ctx->next;
695
696                         if (lock_ctx->request != NULL) {
697                                 return lock_ctx;
698                         }
699
700                         DEBUG(DEBUG_INFO, ("Removing lock context without "
701                                            "lock request\n"));
702                         DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
703                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
704                         CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
705                         talloc_free(lock_ctx);
706                 }
707         }
708
709         return NULL;
710 }
711
712 /*
713  * Schedule a new lock child process
714  * Set up callback handler and timeout handler
715  */
716 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
717 {
718         struct lock_context *lock_ctx;
719         int ret, argc;
720         TALLOC_CTX *tmp_ctx;
721         static char prog[PATH_MAX+1] = "";
722         const char **args;
723
724         if (!ctdb_set_helper("lock helper",
725                              prog, sizeof(prog),
726                              "CTDB_LOCK_HELPER",
727                              CTDB_HELPER_BINDIR, "ctdb_lock_helper")) {
728                 ctdb_die(ctdb, __location__
729                          " Unable to set lock helper\n");
730         }
731
732         /* Find a lock context with requests */
733         lock_ctx = ctdb_find_lock_context(ctdb);
734         if (lock_ctx == NULL) {
735                 return;
736         }
737
738         lock_ctx->child = -1;
739         ret = pipe(lock_ctx->fd);
740         if (ret != 0) {
741                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
742                 return;
743         }
744
745         set_close_on_exec(lock_ctx->fd[0]);
746
747         /* Create data for child process */
748         tmp_ctx = talloc_new(lock_ctx);
749         if (tmp_ctx == NULL) {
750                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
751                 close(lock_ctx->fd[0]);
752                 close(lock_ctx->fd[1]);
753                 return;
754         }
755
756         if (! ctdb->do_setsched) {
757                 ret = setenv("CTDB_NOSETSCHED", "1", 1);
758                 if (ret != 0) {
759                         DEBUG(DEBUG_WARNING,
760                               ("Failed to set CTDB_NOSETSCHED variable\n"));
761                 }
762         }
763
764         /* Create arguments for lock helper */
765         if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
766                               &argc, &args)) {
767                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
768                 close(lock_ctx->fd[0]);
769                 close(lock_ctx->fd[1]);
770                 talloc_free(tmp_ctx);
771                 return;
772         }
773
774         lock_ctx->child = ctdb_vfork_exec(lock_ctx, ctdb, prog, argc,
775                                           (const char **)args);
776         if (lock_ctx->child == -1) {
777                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
778                 close(lock_ctx->fd[0]);
779                 close(lock_ctx->fd[1]);
780                 talloc_free(tmp_ctx);
781                 return;
782         }
783
784         /* Parent process */
785         close(lock_ctx->fd[1]);
786
787         talloc_free(tmp_ctx);
788
789         /* Set up timeout handler */
790         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
791                                             lock_ctx,
792                                             timeval_current_ofs(10, 0),
793                                             ctdb_lock_timeout_handler,
794                                             (void *)lock_ctx);
795         if (lock_ctx->ttimer == NULL) {
796                 ctdb_kill(ctdb, lock_ctx->child, SIGTERM);
797                 lock_ctx->child = -1;
798                 close(lock_ctx->fd[0]);
799                 return;
800         }
801
802         /* Set up callback */
803         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
804                                       lock_ctx,
805                                       lock_ctx->fd[0],
806                                       TEVENT_FD_READ,
807                                       ctdb_lock_handler,
808                                       (void *)lock_ctx);
809         if (lock_ctx->tfd == NULL) {
810                 TALLOC_FREE(lock_ctx->ttimer);
811                 ctdb_kill(ctdb, lock_ctx->child, SIGTERM);
812                 lock_ctx->child = -1;
813                 close(lock_ctx->fd[0]);
814                 return;
815         }
816         tevent_fd_set_auto_close(lock_ctx->tfd);
817
818         /* Move the context from pending to current */
819         if (lock_ctx->type == LOCK_RECORD) {
820                 DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
821                 DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx);
822         } else {
823                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
824                 DLIST_ADD_END(ctdb->lock_current, lock_ctx);
825         }
826         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
827         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
828         if (lock_ctx->ctdb_db) {
829                 lock_ctx->ctdb_db->lock_num_current++;
830                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
831                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
832         }
833 }
834
835
836 /*
837  * Lock record / db depending on type
838  */
839 static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
840                                                struct ctdb_context *ctdb,
841                                                struct ctdb_db_context *ctdb_db,
842                                                TDB_DATA key,
843                                                uint32_t priority,
844                                                void (*callback)(void *, bool),
845                                                void *private_data,
846                                                enum lock_type type,
847                                                bool auto_mark)
848 {
849         struct lock_context *lock_ctx = NULL;
850         struct lock_request *request;
851
852         if (callback == NULL) {
853                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
854                 return NULL;
855         }
856
857         lock_ctx = talloc_zero(ctdb, struct lock_context);
858         if (lock_ctx == NULL) {
859                 DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
860                 return NULL;
861         }
862
863         if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
864                 talloc_free(lock_ctx);
865                 return NULL;
866         }
867
868         lock_ctx->type = type;
869         lock_ctx->ctdb = ctdb;
870         lock_ctx->ctdb_db = ctdb_db;
871         lock_ctx->key.dsize = key.dsize;
872         if (key.dsize > 0) {
873                 lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
874                 if (lock_ctx->key.dptr == NULL) {
875                         DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
876                         talloc_free(lock_ctx);
877                         talloc_free(request);
878                         return NULL;
879                 }
880                 lock_ctx->key_hash = ctdb_hash(&key);
881         } else {
882                 lock_ctx->key.dptr = NULL;
883         }
884         lock_ctx->priority = priority;
885         lock_ctx->auto_mark = auto_mark;
886
887         lock_ctx->request = request;
888         lock_ctx->child = -1;
889
890         /* Non-record locks are required by recovery and should be scheduled
891          * immediately, so keep them at the head of the pending queue.
892          */
893         if (lock_ctx->type == LOCK_RECORD) {
894                 DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx);
895         } else {
896                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx);
897         }
898         CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
899         if (ctdb_db) {
900                 CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
901         }
902
903         /* Start the timer when we activate the context */
904         lock_ctx->start_time = timeval_current();
905
906         request->lctx = lock_ctx;
907         request->callback = callback;
908         request->private_data = private_data;
909
910         talloc_set_destructor(request, ctdb_lock_request_destructor);
911         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
912
913         ctdb_lock_schedule(ctdb);
914
915         return request;
916 }
917
918
919 /*
920  * obtain a lock on a record in a database
921  */
922 struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
923                                       struct ctdb_db_context *ctdb_db,
924                                       TDB_DATA key,
925                                       bool auto_mark,
926                                       void (*callback)(void *, bool),
927                                       void *private_data)
928 {
929         return ctdb_lock_internal(mem_ctx,
930                                   ctdb_db->ctdb,
931                                   ctdb_db,
932                                   key,
933                                   0,
934                                   callback,
935                                   private_data,
936                                   LOCK_RECORD,
937                                   auto_mark);
938 }
939
940
941 /*
942  * obtain a lock on a database
943  */
944 struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
945                                   struct ctdb_db_context *ctdb_db,
946                                   bool auto_mark,
947                                   void (*callback)(void *, bool),
948                                   void *private_data)
949 {
950         return ctdb_lock_internal(mem_ctx,
951                                   ctdb_db->ctdb,
952                                   ctdb_db,
953                                   tdb_null,
954                                   0,
955                                   callback,
956                                   private_data,
957                                   LOCK_DB,
958                                   auto_mark);
959 }