locking: Move function find_lock_context() before ctdb_lock_schedule()
[samba.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    diagnose using /proc/locks and log warning message
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  */
45
46 /* FIXME: Add a tunable max_lock_processes_per_db */
47 #define MAX_LOCK_PROCESSES_PER_DB               (100)
48
49 enum lock_type {
50         LOCK_RECORD,
51         LOCK_DB,
52         LOCK_ALLDB_PRIO,
53         LOCK_ALLDB,
54 };
55
56 static const char * const lock_type_str[] = {
57         "lock_record",
58         "lock_db",
59         "lock_alldb_prio",
60         "lock_db",
61 };
62
63 struct lock_request;
64
65 /* lock_context is the common part for a lock request */
66 struct lock_context {
67         struct lock_context *next, *prev;
68         enum lock_type type;
69         struct ctdb_context *ctdb;
70         struct ctdb_db_context *ctdb_db;
71         TDB_DATA key;
72         uint32_t priority;
73         bool auto_mark;
74         struct lock_request *req_queue;
75         pid_t child;
76         int fd[2];
77         struct tevent_fd *tfd;
78         struct tevent_timer *ttimer;
79         pid_t block_child;
80         int block_fd[2];
81         struct timeval start_time;
82 };
83
84 /* lock_request is the client specific part for a lock request */
85 struct lock_request {
86         struct lock_request *next, *prev;
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
117                             uint32_t priority,
118                             void *private_data);
119
120 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
121                             db_handler_t handler, void *private_data)
122 {
123         struct ctdb_db_context *ctdb_db;
124         int ret;
125
126         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
127                 if (ctdb_db->priority != priority) {
128                         continue;
129                 }
130                 if (later_db(ctdb, ctdb_db->db_name)) {
131                         continue;
132                 }
133                 ret = handler(ctdb_db, priority, private_data);
134                 if (ret != 0) {
135                         return -1;
136                 }
137         }
138
139         /* If priority != 1, later_db check is not required and can return */
140         if (priority != 1) {
141                 return 0;
142         }
143
144         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
145                 if (!later_db(ctdb, ctdb_db->db_name)) {
146                         continue;
147                 }
148                 ret = handler(ctdb_db, priority, private_data);
149                 if (ret != 0) {
150                         return -1;
151                 }
152         }
153
154         return 0;
155 }
156
157
158 /*
159  * lock all databases - mark only
160  */
161 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
162                                 void *private_data)
163 {
164         int tdb_transaction_write_lock_mark(struct tdb_context *);
165
166         DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
167                            ctdb_db->db_name, priority));
168
169         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
170                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
171                                   ctdb_db->db_name));
172                 return -1;
173         }
174
175         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
176                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
177                                   ctdb_db->db_name));
178                 return -1;
179         }
180
181         return 0;
182 }
183
184 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
185 {
186         /*
187          * This function is only used by the main dameon during recovery.
188          * At this stage, the databases have already been locked, by a
189          * dedicated child process. The freeze_mode variable is used to track
190          * whether the actual locks are held by the child process or not.
191          */
192
193         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
194                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
195                 return -1;
196         }
197
198         return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
199 }
200
201 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
202 {
203         uint32_t priority;
204
205         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
206                 if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
207                         return -1;
208                 }
209         }
210
211         return 0;
212 }
213
214
215 /*
216  * lock all databases - unmark only
217  */
218 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
219                                   void *private_data)
220 {
221         int tdb_transaction_write_lock_unmark(struct tdb_context *);
222
223         DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
224                            ctdb_db->db_name, priority));
225
226         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
227                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
228                                   ctdb_db->db_name));
229                 return -1;
230         }
231
232         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
233                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
234                                   ctdb_db->db_name));
235                 return -1;
236         }
237
238         return 0;
239 }
240
241 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
242 {
243         /*
244          * This function is only used by the main dameon during recovery.
245          * At this stage, the databases have already been locked, by a
246          * dedicated child process. The freeze_mode variable is used to track
247          * whether the actual locks are held by the child process or not.
248          */
249
250         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
251                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
252                 return -1;
253         }
254
255         return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
256 }
257
258 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
259 {
260         uint32_t priority;
261
262         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
263                 if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
264                         return -1;
265                 }
266         }
267
268         return 0;
269 }
270
271
272 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
273
274 /*
275  * Destructor to kill the child locking process
276  */
277 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
278 {
279         if (lock_ctx->child > 0) {
280                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
281                 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
282                 lock_ctx->ctdb->lock_num_current--;
283                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
284                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
285                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
286                 }
287         } else {
288                 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
289                 lock_ctx->ctdb->lock_num_pending--;
290                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
291                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
292                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
293                 }
294         }
295
296         ctdb_lock_schedule(lock_ctx->ctdb);
297
298         return 0;
299 }
300
301
302 /*
303  * Destructor to remove lock request
304  */
305 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
306 {
307         DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
308         return 0;
309 }
310
311
312 void ctdb_lock_free_request_context(struct lock_request *lock_req)
313 {
314         struct lock_context *lock_ctx;
315
316         lock_ctx = lock_req->lctx;
317         talloc_free(lock_req);
318         talloc_free(lock_ctx);
319 }
320
321
322 /*
323  * Process all the callbacks waiting for lock
324  *
325  * If lock has failed, callback is executed with locked=false
326  */
327 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
328 {
329         struct lock_request *request, *next;
330
331         if (lock_ctx->auto_mark && locked) {
332                 switch (lock_ctx->type) {
333                 case LOCK_RECORD:
334                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
335                         break;
336
337                 case LOCK_DB:
338                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
339                         break;
340
341                 case LOCK_ALLDB_PRIO:
342                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
343                         break;
344
345                 case LOCK_ALLDB:
346                         ctdb_lockall_mark(lock_ctx->ctdb);
347                         break;
348                 }
349         }
350
351         /* Iterate through all callbacks */
352         request = lock_ctx->req_queue;
353         while (request) {
354                 if (lock_ctx->auto_mark) {
355                         /* Reset the destructor, so request is not removed from the list */
356                         talloc_set_destructor(request, NULL);
357                 }
358
359                 /* In case, callback frees the request, store next */
360                 next = request->next;
361                 request->callback(request->private_data, locked);
362                 request = next;
363         }
364
365         if (lock_ctx->auto_mark && locked) {
366                 switch (lock_ctx->type) {
367                 case LOCK_RECORD:
368                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
369                         break;
370
371                 case LOCK_DB:
372                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
373                         break;
374
375                 case LOCK_ALLDB_PRIO:
376                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
377                         break;
378
379                 case LOCK_ALLDB:
380                         ctdb_lockall_unmark(lock_ctx->ctdb);
381                         break;
382                 }
383         }
384 }
385
386
387 static int lock_bucket_id(double t)
388 {
389         double ms = 1.e-3, s = 1;
390         int id;
391
392         if (t < 1*ms) {
393                 id = 0;
394         } else if (t < 10*ms) {
395                 id = 1;
396         } else if (t < 100*ms) {
397                 id = 2;
398         } else if (t < 1*s) {
399                 id = 3;
400         } else if (t < 2*s) {
401                 id = 4;
402         } else if (t < 4*s) {
403                 id = 5;
404         } else if (t < 8*s) {
405                 id = 6;
406         } else if (t < 16*s) {
407                 id = 7;
408         } else if (t < 32*s) {
409                 id = 8;
410         } else if (t < 64*s) {
411                 id = 9;
412         } else {
413                 id = 10;
414         }
415
416         return id;
417 }
418
419 /*
420  * Callback routine when the required locks are obtained.
421  * Called from parent context
422  */
423 static void ctdb_lock_handler(struct tevent_context *ev,
424                             struct tevent_fd *tfd,
425                             uint16_t flags,
426                             void *private_data)
427 {
428         struct lock_context *lock_ctx;
429         TALLOC_CTX *tmp_ctx = NULL;
430         char c;
431         bool locked;
432         double t;
433         int id;
434
435         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
436
437         /* cancel the timeout event */
438         if (lock_ctx->ttimer) {
439                 TALLOC_FREE(lock_ctx->ttimer);
440         }
441
442         t = timeval_elapsed(&lock_ctx->start_time);
443         id = lock_bucket_id(t);
444
445         if (lock_ctx->auto_mark) {
446                 tmp_ctx = talloc_new(ev);
447                 talloc_steal(tmp_ctx, lock_ctx);
448         }
449
450         /* Read the status from the child process */
451         read(lock_ctx->fd[0], &c, 1);
452         locked = (c == 0 ? true : false);
453
454         /* Update statistics */
455         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
456         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
457         if (lock_ctx->ctdb_db) {
458                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
459                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
460         }
461
462         if (locked) {
463                 if (lock_ctx->ctdb_db) {
464                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
465                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
466                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
467                                             lock_type_str[lock_ctx->type], locks.latency,
468                                             lock_ctx->start_time);
469
470                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
471                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
472                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
473                 }
474         } else {
475                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
476                 if (lock_ctx->ctdb_db) {
477                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
478                 }
479         }
480
481         process_callbacks(lock_ctx, locked);
482
483         if (lock_ctx->auto_mark) {
484                 talloc_free(tmp_ctx);
485         }
486 }
487
488
489 /*
490  * Callback routine when required locks are not obtained within timeout
491  * Called from parent context
492  */
493 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
494                                     struct tevent_timer *ttimer,
495                                     struct timeval current_time,
496                                     void *private_data)
497 {
498         const char *cmd = getenv("CTDB_DEBUG_LOCKS");
499         struct lock_context *lock_ctx;
500         struct ctdb_context *ctdb;
501         pid_t pid;
502
503         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
504         ctdb = lock_ctx->ctdb;
505
506         if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
507                 DEBUG(DEBUG_WARNING,
508                       ("Unable to get %s lock on database %s for %.0lf seconds\n",
509                        (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
510                        lock_ctx->ctdb_db->db_name,
511                        timeval_elapsed(&lock_ctx->start_time)));
512         } else {
513                 DEBUG(DEBUG_WARNING,
514                       ("Unable to get ALLDB locks for %.0lf seconds\n",
515                        timeval_elapsed(&lock_ctx->start_time)));
516         }
517
518         /* fire a child process to find the blocking process */
519         if (cmd != NULL) {
520                 pid = fork();
521                 if (pid == 0) {
522                         execl(cmd, cmd, NULL);
523                 }
524         }
525
526         /* reset the timeout timer */
527         // talloc_free(lock_ctx->ttimer);
528         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
529                                             lock_ctx,
530                                             timeval_current_ofs(10, 0),
531                                             ctdb_lock_timeout_handler,
532                                             (void *)lock_ctx);
533 }
534
535
536 static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
537                             void *private_data)
538 {
539         int *count = (int *)private_data;
540
541         (*count)++;
542
543         return 0;
544 }
545
546 struct db_namelist {
547         char **names;
548         int n;
549 };
550
551 static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
552                            void *private_data)
553 {
554         struct db_namelist *list = (struct db_namelist *)private_data;
555
556         list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
557         list->n++;
558
559         return 0;
560 }
561
562 static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ctx, int fd)
563 {
564         struct ctdb_context *ctdb = lock_ctx->ctdb;
565         char **args = NULL;
566         int nargs, i;
567         int priority;
568         struct db_namelist list;
569
570         switch (lock_ctx->type) {
571         case LOCK_RECORD:
572                 nargs = 6;
573                 break;
574
575         case LOCK_DB:
576                 nargs = 5;
577                 break;
578
579         case LOCK_ALLDB_PRIO:
580                 nargs = 4;
581                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
582                 break;
583
584         case LOCK_ALLDB:
585                 nargs = 4;
586                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
587                         ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
588                 }
589                 break;
590         }
591
592         /* Add extra argument for null termination */
593         nargs++;
594
595         args = talloc_array(mem_ctx, char *, nargs);
596         if (args == NULL) {
597                 return NULL;
598         }
599
600         args[0] = talloc_strdup(args, "ctdb_lock_helper");
601         args[1] = talloc_asprintf(args, "%d", getpid());
602         args[2] = talloc_asprintf(args, "%d", fd);
603
604         switch (lock_ctx->type) {
605         case LOCK_RECORD:
606                 args[3] = talloc_strdup(args, "RECORD");
607                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
608                 if (lock_ctx->key.dsize == 0) {
609                         args[5] = talloc_strdup(args, "NULL");
610                 } else {
611                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
612                 }
613                 break;
614
615         case LOCK_DB:
616                 args[3] = talloc_strdup(args, "DB");
617                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
618                 break;
619
620         case LOCK_ALLDB_PRIO:
621                 args[3] = talloc_strdup(args, "DB");
622                 list.names = args;
623                 list.n = 4;
624                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
625                 break;
626
627         case LOCK_ALLDB:
628                 args[3] = talloc_strdup(args, "DB");
629                 list.names = args;
630                 list.n = 4;
631                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
632                         ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
633                 }
634                 break;
635         }
636
637         /* Make sure last argument is NULL */
638         args[nargs-1] = NULL;
639
640         for (i=0; i<nargs-1; i++) {
641                 if (args[i] == NULL) {
642                         talloc_free(args);
643                         return NULL;
644                 }
645         }
646
647         return args;
648 }
649
650
651 /*
652  * Find the lock context of a given type
653  */
654 static struct lock_context *find_lock_context(struct lock_context *lock_list,
655                                               struct ctdb_db_context *ctdb_db,
656                                               TDB_DATA key,
657                                               uint32_t priority,
658                                               enum lock_type type)
659 {
660         struct lock_context *lock_ctx;
661
662         /* Search active locks */
663         for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
664                 if (lock_ctx->type != type) {
665                         continue;
666                 }
667
668                 switch (lock_ctx->type) {
669                 case LOCK_RECORD:
670                         if (ctdb_db == lock_ctx->ctdb_db &&
671                             key.dsize == lock_ctx->key.dsize &&
672                             memcmp(key.dptr, lock_ctx->key.dptr, key.dsize) == 0) {
673                                 goto done;
674                         }
675                         break;
676
677                 case LOCK_DB:
678                         if (ctdb_db == lock_ctx->ctdb_db) {
679                                 goto done;
680                         }
681                         break;
682
683                 case LOCK_ALLDB_PRIO:
684                         if (priority == lock_ctx->priority) {
685                                 goto done;
686                         }
687                         break;
688
689                 case LOCK_ALLDB:
690                         goto done;
691                         break;
692                 }
693         }
694
695         /* Did not find the lock context we are searching for */
696         lock_ctx = NULL;
697
698 done:
699         return lock_ctx;
700
701 }
702
703
704 /*
705  * Schedule a new lock child process
706  * Set up callback handler and timeout handler
707  */
708 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
709 {
710         struct lock_context *lock_ctx, *next_ctx;
711         int ret;
712         TALLOC_CTX *tmp_ctx;
713         const char *helper = BINDIR "/ctdb_lock_helper";
714         static const char *prog = NULL;
715         char **args;
716
717         if (prog == NULL) {
718                 const char *t;
719
720                 t = getenv("CTDB_LOCK_HELPER");
721                 if (t != NULL) {
722                         prog = talloc_strdup(ctdb, t);
723                 } else {
724                         prog = talloc_strdup(ctdb, helper);
725                 }
726                 CTDB_NO_MEMORY_VOID(ctdb, prog);
727         }
728
729         if (ctdb->lock_num_current >= MAX_LOCK_PROCESSES_PER_DB) {
730                 return;
731         }
732
733         if (ctdb->lock_pending == NULL) {
734                 return;
735         }
736
737         /* Find a lock context with requests */
738         lock_ctx = ctdb->lock_pending;
739         while (lock_ctx != NULL) {
740                 if (! lock_ctx->req_queue) {
741                         next_ctx = lock_ctx->next;
742                         DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
743                         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
744                         ctdb->lock_num_pending--;
745                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
746                         if (lock_ctx->ctdb_db) {
747                                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
748                         }
749                         talloc_free(lock_ctx);
750                         lock_ctx = next_ctx;
751                         continue;
752                 } else {
753                         /* Found a lock context with lock requests */
754                         break;
755                 }
756         }
757
758         if (lock_ctx == NULL) {
759                 return;
760         }
761
762         lock_ctx->child = -1;
763         ret = pipe(lock_ctx->fd);
764         if (ret != 0) {
765                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
766                 return;
767         }
768
769         set_close_on_exec(lock_ctx->fd[0]);
770
771         /* Create data for child process */
772         tmp_ctx = talloc_new(lock_ctx);
773         if (tmp_ctx == NULL) {
774                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
775                 close(lock_ctx->fd[0]);
776                 close(lock_ctx->fd[1]);
777                 return;
778         }
779
780         /* Create arguments for lock helper */
781         args = lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1]);
782         if (args == NULL) {
783                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
784                 close(lock_ctx->fd[0]);
785                 close(lock_ctx->fd[1]);
786                 talloc_free(tmp_ctx);
787                 return;
788         }
789
790         lock_ctx->child = ctdb_fork(ctdb);
791
792         if (lock_ctx->child == (pid_t)-1) {
793                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
794                 close(lock_ctx->fd[0]);
795                 close(lock_ctx->fd[1]);
796                 talloc_free(tmp_ctx);
797                 return;
798         }
799
800
801         /* Child process */
802         if (lock_ctx->child == 0) {
803                 ret = execv(prog, args);
804                 if (ret < 0) {
805                         DEBUG(DEBUG_ERR, ("Failed to execute helper %s (%d, %s)\n",
806                                           prog, errno, strerror(errno)));
807                 }
808                 _exit(1);
809         }
810
811         /* Parent process */
812         close(lock_ctx->fd[1]);
813
814         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
815
816         talloc_free(tmp_ctx);
817
818         /* Set up timeout handler */
819         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
820                                             lock_ctx,
821                                             timeval_current_ofs(10, 0),
822                                             ctdb_lock_timeout_handler,
823                                             (void *)lock_ctx);
824         if (lock_ctx->ttimer == NULL) {
825                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
826                 lock_ctx->child = -1;
827                 talloc_set_destructor(lock_ctx, NULL);
828                 close(lock_ctx->fd[0]);
829                 return;
830         }
831
832         /* Set up callback */
833         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
834                                       lock_ctx,
835                                       lock_ctx->fd[0],
836                                       EVENT_FD_READ,
837                                       ctdb_lock_handler,
838                                       (void *)lock_ctx);
839         if (lock_ctx->tfd == NULL) {
840                 TALLOC_FREE(lock_ctx->ttimer);
841                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
842                 lock_ctx->child = -1;
843                 talloc_set_destructor(lock_ctx, NULL);
844                 close(lock_ctx->fd[0]);
845                 return;
846         }
847         tevent_fd_set_auto_close(lock_ctx->tfd);
848
849         /* Move the context from pending to current */
850         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
851         ctdb->lock_num_pending--;
852         DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
853         ctdb->lock_num_current++;
854 }
855
856
857 /*
858  * Lock record / db depending on type
859  */
860 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
861                                                struct ctdb_db_context *ctdb_db,
862                                                TDB_DATA key,
863                                                uint32_t priority,
864                                                void (*callback)(void *, bool),
865                                                void *private_data,
866                                                enum lock_type type,
867                                                bool auto_mark)
868 {
869         struct lock_context *lock_ctx;
870         struct lock_request *request;
871
872         if (callback == NULL) {
873                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
874                 return NULL;
875         }
876
877         /* get a context for this key - search only the pending contexts,
878          * current contexts might in the middle of processing callbacks */
879         lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
880
881         /* No existing context, create one */
882         if (lock_ctx == NULL) {
883                 lock_ctx = talloc_zero(ctdb, struct lock_context);
884                 if (lock_ctx == NULL) {
885                         DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
886                         return NULL;
887                 }
888
889                 lock_ctx->type = type;
890                 lock_ctx->ctdb = ctdb;
891                 lock_ctx->ctdb_db = ctdb_db;
892                 lock_ctx->key.dsize = key.dsize;
893                 if (key.dsize > 0) {
894                         lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
895                 } else {
896                         lock_ctx->key.dptr = NULL;
897                 }
898                 lock_ctx->priority = priority;
899                 lock_ctx->auto_mark = auto_mark;
900
901                 lock_ctx->child = -1;
902                 lock_ctx->block_child = -1;
903
904                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
905                 ctdb->lock_num_pending++;
906                 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
907                 if (ctdb_db) {
908                         CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
909                 }
910
911                 /* Start the timer when we activate the context */
912                 lock_ctx->start_time = timeval_current();
913         }
914
915         if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
916                 return NULL;
917         }
918
919         request->lctx = lock_ctx;
920         request->callback = callback;
921         request->private_data = private_data;
922
923         talloc_set_destructor(request, ctdb_lock_request_destructor);
924         DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
925
926         ctdb_lock_schedule(ctdb);
927
928         return request;
929 }
930
931
932 /*
933  * obtain a lock on a record in a database
934  */
935 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
936                                       TDB_DATA key,
937                                       bool auto_mark,
938                                       void (*callback)(void *, bool),
939                                       void *private_data)
940 {
941         return ctdb_lock_internal(ctdb_db->ctdb,
942                                   ctdb_db,
943                                   key,
944                                   0,
945                                   callback,
946                                   private_data,
947                                   LOCK_RECORD,
948                                   auto_mark);
949 }
950
951
952 /*
953  * obtain a lock on a database
954  */
955 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
956                                   bool auto_mark,
957                                   void (*callback)(void *, bool),
958                                   void *private_data)
959 {
960         return ctdb_lock_internal(ctdb_db->ctdb,
961                                   ctdb_db,
962                                   tdb_null,
963                                   0,
964                                   callback,
965                                   private_data,
966                                   LOCK_DB,
967                                   auto_mark);
968 }
969
970
971 /*
972  * obtain locks on all databases of specified priority
973  */
974 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
975                                           uint32_t priority,
976                                           bool auto_mark,
977                                           void (*callback)(void *, bool),
978                                           void *private_data)
979 {
980         if (priority < 0 || priority > NUM_DB_PRIORITIES) {
981                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
982                 return NULL;
983         }
984
985         return ctdb_lock_internal(ctdb,
986                                   NULL,
987                                   tdb_null,
988                                   priority,
989                                   callback,
990                                   private_data,
991                                   LOCK_ALLDB_PRIO,
992                                   auto_mark);
993 }
994
995
996 /*
997  * obtain locks on all databases
998  */
999 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
1000                                      bool auto_mark,
1001                                      void (*callback)(void *, bool),
1002                                      void *private_data)
1003 {
1004         return ctdb_lock_internal(ctdb,
1005                                   NULL,
1006                                   tdb_null,
1007                                   0,
1008                                   callback,
1009                                   private_data,
1010                                   LOCK_ALLDB,
1011                                   auto_mark);
1012 }
1013