ctdb-locking: There are no ALLDB locks any more
[samba.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/network.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
30 #include "lib/util/samba_util.h"
31 #include "lib/util/sys_rw.h"
32
33 #include "ctdb_private.h"
34
35 #include "common/common.h"
36 #include "common/logging.h"
37
38 /*
39  * Non-blocking Locking API
40  *
41  * 1. Create a child process to do blocking locks.
42  * 2. Once the locks are obtained, signal parent process via fd.
43  * 3. Invoke registered callback routine with locking status.
44  * 4. If the child process cannot get locks within certain time,
45  *    execute an external script to debug.
46  *
47  * ctdb_lock_record()      - get a lock on a record
48  * ctdb_lock_db()          - get a lock on a DB
49  *
50  *  auto_mark              - whether to mark/unmark DBs in before/after callback
51  *                           = false is used for freezing databases for
52  *                           recovery since the recovery cannot start till
53  *                           databases are locked on all the nodes.
54  *                           = true is used for record locks.
55  */
56
57 enum lock_type {
58         LOCK_RECORD,
59         LOCK_DB,
60 };
61
62 static const char * const lock_type_str[] = {
63         "lock_record",
64         "lock_db",
65 };
66
67 struct lock_request;
68
69 /* lock_context is the common part for a lock request */
70 struct lock_context {
71         struct lock_context *next, *prev;
72         enum lock_type type;
73         struct ctdb_context *ctdb;
74         struct ctdb_db_context *ctdb_db;
75         TDB_DATA key;
76         uint32_t priority;
77         bool auto_mark;
78         struct lock_request *request;
79         pid_t child;
80         int fd[2];
81         struct tevent_fd *tfd;
82         struct tevent_timer *ttimer;
83         struct timeval start_time;
84         uint32_t key_hash;
85         bool can_schedule;
86 };
87
88 /* lock_request is the client specific part for a lock request */
89 struct lock_request {
90         struct lock_context *lctx;
91         void (*callback)(void *, bool);
92         void *private_data;
93 };
94
95
96 int ctdb_db_iterator(struct ctdb_context *ctdb, ctdb_db_handler_t handler,
97                      void *private_data)
98 {
99         struct ctdb_db_context *ctdb_db;
100         int ret;
101
102         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
103                 ret = handler(ctdb_db, private_data);
104                 if (ret != 0) {
105                         return -1;
106                 }
107         }
108
109         return 0;
110 }
111
112 /*
113  * lock all databases - mark only
114  */
115 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db,
116                                 void *private_data)
117 {
118         int tdb_transaction_write_lock_mark(struct tdb_context *);
119
120         DEBUG(DEBUG_INFO, ("marking locked database %s\n", ctdb_db->db_name));
121
122         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
123                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
124                                   ctdb_db->db_name));
125                 return -1;
126         }
127
128         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
129                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
130                                   ctdb_db->db_name));
131                 return -1;
132         }
133
134         return 0;
135 }
136
137 int ctdb_lockdb_mark(struct ctdb_db_context *ctdb_db)
138 {
139         if (!ctdb_db_frozen(ctdb_db)) {
140                 DEBUG(DEBUG_ERR,
141                       ("Attempt to mark database locked when not frozen\n"));
142                 return -1;
143         }
144
145         return db_lock_mark_handler(ctdb_db, NULL);
146 }
147
148 /*
149  * lock all databases - unmark only
150  */
151 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db,
152                                   void *private_data)
153 {
154         int tdb_transaction_write_lock_unmark(struct tdb_context *);
155
156         DEBUG(DEBUG_INFO, ("unmarking locked database %s\n", ctdb_db->db_name));
157
158         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
159                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
160                                   ctdb_db->db_name));
161                 return -1;
162         }
163
164         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
165                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
166                                   ctdb_db->db_name));
167                 return -1;
168         }
169
170         return 0;
171 }
172
173 int ctdb_lockdb_unmark(struct ctdb_db_context *ctdb_db)
174 {
175         if (!ctdb_db_frozen(ctdb_db)) {
176                 DEBUG(DEBUG_ERR,
177                       ("Attempt to unmark database locked when not frozen\n"));
178                 return -1;
179         }
180
181         return db_lock_unmark_handler(ctdb_db, NULL);
182 }
183
184 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
185
186 /*
187  * Destructor to kill the child locking process
188  */
189 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
190 {
191         if (lock_ctx->request) {
192                 lock_ctx->request->lctx = NULL;
193         }
194         if (lock_ctx->child > 0) {
195                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGTERM);
196                 if (lock_ctx->type == LOCK_RECORD) {
197                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
198                 } else {
199                         DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
200                 }
201                 if (lock_ctx->ctdb_db) {
202                         lock_ctx->ctdb_db->lock_num_current--;
203                 }
204                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
205                 if (lock_ctx->ctdb_db) {
206                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
207                 }
208         } else {
209                 if (lock_ctx->type == LOCK_RECORD) {
210                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
211                 } else {
212                         DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
213                 }
214                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
215                 if (lock_ctx->ctdb_db) {
216                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
217                 }
218         }
219
220         ctdb_lock_schedule(lock_ctx->ctdb);
221
222         return 0;
223 }
224
225
226 /*
227  * Destructor to remove lock request
228  */
229 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
230 {
231         if (lock_request->lctx == NULL) {
232                 return 0;
233         }
234
235         lock_request->lctx->request = NULL;
236         TALLOC_FREE(lock_request->lctx);
237
238         return 0;
239 }
240
241 /*
242  * Process all the callbacks waiting for lock
243  *
244  * If lock has failed, callback is executed with locked=false
245  */
246 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
247 {
248         struct lock_request *request;
249         bool auto_mark = lock_ctx->auto_mark;
250
251         if (auto_mark && locked) {
252                 switch (lock_ctx->type) {
253                 case LOCK_RECORD:
254                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
255                         break;
256
257                 case LOCK_DB:
258                         ctdb_lockdb_mark(lock_ctx->ctdb_db);
259                         break;
260                 }
261         }
262
263         request = lock_ctx->request;
264         if (auto_mark) {
265                 /* Since request may be freed in the callback, unset the lock
266                  * context, so request destructor will not free lock context.
267                  */
268                 request->lctx = NULL;
269         }
270
271         /* Since request may be freed in the callback, unset the request */
272         lock_ctx->request = NULL;
273
274         request->callback(request->private_data, locked);
275
276         if (!auto_mark) {
277                 return;
278         }
279
280         if (locked) {
281                 switch (lock_ctx->type) {
282                 case LOCK_RECORD:
283                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
284                         break;
285
286                 case LOCK_DB:
287                         ctdb_lockdb_unmark(lock_ctx->ctdb_db);
288                         break;
289                 }
290         }
291
292         talloc_free(lock_ctx);
293 }
294
295
296 static int lock_bucket_id(double t)
297 {
298         double ms = 1.e-3, s = 1;
299         int id;
300
301         if (t < 1*ms) {
302                 id = 0;
303         } else if (t < 10*ms) {
304                 id = 1;
305         } else if (t < 100*ms) {
306                 id = 2;
307         } else if (t < 1*s) {
308                 id = 3;
309         } else if (t < 2*s) {
310                 id = 4;
311         } else if (t < 4*s) {
312                 id = 5;
313         } else if (t < 8*s) {
314                 id = 6;
315         } else if (t < 16*s) {
316                 id = 7;
317         } else if (t < 32*s) {
318                 id = 8;
319         } else if (t < 64*s) {
320                 id = 9;
321         } else {
322                 id = 10;
323         }
324
325         return id;
326 }
327
328 /*
329  * Callback routine when the required locks are obtained.
330  * Called from parent context
331  */
332 static void ctdb_lock_handler(struct tevent_context *ev,
333                             struct tevent_fd *tfd,
334                             uint16_t flags,
335                             void *private_data)
336 {
337         struct lock_context *lock_ctx;
338         char c;
339         bool locked;
340         double t;
341         int id;
342
343         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
344
345         /* cancel the timeout event */
346         TALLOC_FREE(lock_ctx->ttimer);
347
348         t = timeval_elapsed(&lock_ctx->start_time);
349         id = lock_bucket_id(t);
350
351         /* Read the status from the child process */
352         if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
353                 locked = false;
354         } else {
355                 locked = (c == 0 ? true : false);
356         }
357
358         /* Update statistics */
359         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
360         if (lock_ctx->ctdb_db) {
361                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
362         }
363
364         if (locked) {
365                 if (lock_ctx->ctdb_db) {
366                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
367                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
368                                             lock_type_str[lock_ctx->type], locks.latency,
369                                             lock_ctx->start_time);
370
371                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
372                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
373                 }
374         } else {
375                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
376                 if (lock_ctx->ctdb_db) {
377                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
378                 }
379         }
380
381         process_callbacks(lock_ctx, locked);
382 }
383
384
385 /*
386  * Callback routine when required locks are not obtained within timeout
387  * Called from parent context
388  */
389 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
390                                     struct tevent_timer *ttimer,
391                                     struct timeval current_time,
392                                     void *private_data)
393 {
394         static char debug_locks[PATH_MAX+1] = "";
395         static struct timeval last_debug_time;
396         struct lock_context *lock_ctx;
397         struct ctdb_context *ctdb;
398         struct timeval now;
399         pid_t pid;
400         double elapsed_time;
401         int new_timer;
402
403         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
404         ctdb = lock_ctx->ctdb;
405
406         elapsed_time = timeval_elapsed(&lock_ctx->start_time);
407         DEBUG(DEBUG_WARNING,
408               ("Unable to get %s lock on database %s for %.0lf seconds\n",
409                (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
410                lock_ctx->ctdb_db->db_name, elapsed_time));
411
412         /* If a node stopped/banned, don't spam the logs */
413         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
414                 goto skip_lock_debug;
415         }
416
417         /* Restrict log debugging to once per second */
418         now = timeval_current();
419         if (last_debug_time.tv_sec == now.tv_sec) {
420                 goto skip_lock_debug;
421         }
422
423         last_debug_time.tv_sec = now.tv_sec;
424
425         if (ctdb_set_helper("lock debugging helper",
426                             debug_locks, sizeof(debug_locks),
427                             "CTDB_DEBUG_LOCKS",
428                             getenv("CTDB_BASE"), "debug_locks.sh")) {
429                 pid = vfork();
430                 if (pid == 0) {
431                         execl(debug_locks, debug_locks, NULL);
432                         _exit(0);
433                 }
434                 ctdb_track_child(ctdb, pid);
435         } else {
436                 DEBUG(DEBUG_WARNING,
437                       (__location__
438                        " Unable to setup lock debugging\n"));
439         }
440
441 skip_lock_debug:
442
443         /* Back-off logging if lock is not obtained for a long time */
444         if (elapsed_time < 100.0) {
445                 new_timer = 10;
446         } else if (elapsed_time < 1000.0) {
447                 new_timer = 100;
448         } else {
449                 new_timer = 1000;
450         }
451
452         /* reset the timeout timer */
453         // talloc_free(lock_ctx->ttimer);
454         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
455                                             lock_ctx,
456                                             timeval_current_ofs(new_timer, 0),
457                                             ctdb_lock_timeout_handler,
458                                             (void *)lock_ctx);
459 }
460
461 static int db_flags(struct ctdb_db_context *ctdb_db)
462 {
463         int tdb_flags = TDB_DEFAULT;
464
465 #ifdef TDB_MUTEX_LOCKING
466         if (!ctdb_db->persistent && ctdb_db->ctdb->tunable.mutex_enabled) {
467                 tdb_flags = (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
468         }
469 #endif
470         return tdb_flags;
471 }
472
473 static bool lock_helper_args(TALLOC_CTX *mem_ctx,
474                              struct lock_context *lock_ctx, int fd,
475                              int *argc, const char ***argv)
476 {
477         const char **args = NULL;
478         int nargs = 0, i;
479
480         switch (lock_ctx->type) {
481         case LOCK_RECORD:
482                 nargs = 6;
483                 break;
484
485         case LOCK_DB:
486                 nargs = 5;
487                 break;
488         }
489
490         /* Add extra argument for null termination */
491         nargs++;
492
493         args = talloc_array(mem_ctx, const char *, nargs);
494         if (args == NULL) {
495                 return false;
496         }
497
498         args[0] = talloc_asprintf(args, "%d", getpid());
499         args[1] = talloc_asprintf(args, "%d", fd);
500
501         switch (lock_ctx->type) {
502         case LOCK_RECORD:
503                 args[2] = talloc_strdup(args, "RECORD");
504                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
505                 args[4] = talloc_asprintf(args, "0x%x",
506                                           db_flags(lock_ctx->ctdb_db));
507                 if (lock_ctx->key.dsize == 0) {
508                         args[5] = talloc_strdup(args, "NULL");
509                 } else {
510                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
511                 }
512                 break;
513
514         case LOCK_DB:
515                 args[2] = talloc_strdup(args, "DB");
516                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
517                 args[4] = talloc_asprintf(args, "0x%x",
518                                           db_flags(lock_ctx->ctdb_db));
519                 break;
520         }
521
522         /* Make sure last argument is NULL */
523         args[nargs-1] = NULL;
524
525         for (i=0; i<nargs-1; i++) {
526                 if (args[i] == NULL) {
527                         talloc_free(args);
528                         return false;
529                 }
530         }
531
532         *argc = nargs;
533         *argv = args;
534         return true;
535 }
536
537 /*
538  * Find a lock request that can be scheduled
539  */
540 static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
541 {
542         struct lock_context *lock_ctx, *next_ctx;
543         struct ctdb_db_context *ctdb_db;
544
545         /* First check if there are database lock requests */
546
547         for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
548              lock_ctx = next_ctx) {
549
550                 if (lock_ctx->request != NULL) {
551                         /* Found a lock context with a request */
552                         return lock_ctx;
553                 }
554
555                 next_ctx = lock_ctx->next;
556
557                 DEBUG(DEBUG_INFO, ("Removing lock context without lock "
558                                    "request\n"));
559                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
560                 CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
561                 if (lock_ctx->ctdb_db) {
562                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
563                                                locks.num_pending);
564                 }
565                 talloc_free(lock_ctx);
566         }
567
568         /* Next check database queues */
569         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
570                 if (ctdb_db->lock_num_current ==
571                     ctdb->tunable.lock_processes_per_db) {
572                         continue;
573                 }
574
575                 for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
576                      lock_ctx = next_ctx) {
577
578                         next_ctx = lock_ctx->next;
579
580                         if (lock_ctx->request != NULL) {
581                                 return lock_ctx;
582                         }
583
584                         DEBUG(DEBUG_INFO, ("Removing lock context without "
585                                            "lock request\n"));
586                         DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
587                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
588                         CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
589                         talloc_free(lock_ctx);
590                 }
591         }
592
593         return NULL;
594 }
595
596 /*
597  * Schedule a new lock child process
598  * Set up callback handler and timeout handler
599  */
600 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
601 {
602         struct lock_context *lock_ctx;
603         int ret, argc;
604         TALLOC_CTX *tmp_ctx;
605         static char prog[PATH_MAX+1] = "";
606         const char **args;
607
608         if (!ctdb_set_helper("lock helper",
609                              prog, sizeof(prog),
610                              "CTDB_LOCK_HELPER",
611                              CTDB_HELPER_BINDIR, "ctdb_lock_helper")) {
612                 ctdb_die(ctdb, __location__
613                          " Unable to set lock helper\n");
614         }
615
616         /* Find a lock context with requests */
617         lock_ctx = ctdb_find_lock_context(ctdb);
618         if (lock_ctx == NULL) {
619                 return;
620         }
621
622         lock_ctx->child = -1;
623         ret = pipe(lock_ctx->fd);
624         if (ret != 0) {
625                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
626                 return;
627         }
628
629         set_close_on_exec(lock_ctx->fd[0]);
630
631         /* Create data for child process */
632         tmp_ctx = talloc_new(lock_ctx);
633         if (tmp_ctx == NULL) {
634                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
635                 close(lock_ctx->fd[0]);
636                 close(lock_ctx->fd[1]);
637                 return;
638         }
639
640         if (! ctdb->do_setsched) {
641                 ret = setenv("CTDB_NOSETSCHED", "1", 1);
642                 if (ret != 0) {
643                         DEBUG(DEBUG_WARNING,
644                               ("Failed to set CTDB_NOSETSCHED variable\n"));
645                 }
646         }
647
648         /* Create arguments for lock helper */
649         if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
650                               &argc, &args)) {
651                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
652                 close(lock_ctx->fd[0]);
653                 close(lock_ctx->fd[1]);
654                 talloc_free(tmp_ctx);
655                 return;
656         }
657
658         lock_ctx->child = ctdb_vfork_exec(lock_ctx, ctdb, prog, argc,
659                                           (const char **)args);
660         if (lock_ctx->child == -1) {
661                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
662                 close(lock_ctx->fd[0]);
663                 close(lock_ctx->fd[1]);
664                 talloc_free(tmp_ctx);
665                 return;
666         }
667
668         /* Parent process */
669         close(lock_ctx->fd[1]);
670
671         talloc_free(tmp_ctx);
672
673         /* Set up timeout handler */
674         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
675                                             lock_ctx,
676                                             timeval_current_ofs(10, 0),
677                                             ctdb_lock_timeout_handler,
678                                             (void *)lock_ctx);
679         if (lock_ctx->ttimer == NULL) {
680                 ctdb_kill(ctdb, lock_ctx->child, SIGTERM);
681                 lock_ctx->child = -1;
682                 close(lock_ctx->fd[0]);
683                 return;
684         }
685
686         /* Set up callback */
687         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
688                                       lock_ctx,
689                                       lock_ctx->fd[0],
690                                       TEVENT_FD_READ,
691                                       ctdb_lock_handler,
692                                       (void *)lock_ctx);
693         if (lock_ctx->tfd == NULL) {
694                 TALLOC_FREE(lock_ctx->ttimer);
695                 ctdb_kill(ctdb, lock_ctx->child, SIGTERM);
696                 lock_ctx->child = -1;
697                 close(lock_ctx->fd[0]);
698                 return;
699         }
700         tevent_fd_set_auto_close(lock_ctx->tfd);
701
702         /* Move the context from pending to current */
703         if (lock_ctx->type == LOCK_RECORD) {
704                 DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
705                 DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx);
706         } else {
707                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
708                 DLIST_ADD_END(ctdb->lock_current, lock_ctx);
709         }
710         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
711         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
712         if (lock_ctx->ctdb_db) {
713                 lock_ctx->ctdb_db->lock_num_current++;
714                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
715                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
716         }
717 }
718
719
720 /*
721  * Lock record / db depending on type
722  */
723 static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
724                                                struct ctdb_context *ctdb,
725                                                struct ctdb_db_context *ctdb_db,
726                                                TDB_DATA key,
727                                                uint32_t priority,
728                                                void (*callback)(void *, bool),
729                                                void *private_data,
730                                                enum lock_type type,
731                                                bool auto_mark)
732 {
733         struct lock_context *lock_ctx = NULL;
734         struct lock_request *request;
735
736         if (callback == NULL) {
737                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
738                 return NULL;
739         }
740
741         lock_ctx = talloc_zero(ctdb, struct lock_context);
742         if (lock_ctx == NULL) {
743                 DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
744                 return NULL;
745         }
746
747         if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
748                 talloc_free(lock_ctx);
749                 return NULL;
750         }
751
752         lock_ctx->type = type;
753         lock_ctx->ctdb = ctdb;
754         lock_ctx->ctdb_db = ctdb_db;
755         lock_ctx->key.dsize = key.dsize;
756         if (key.dsize > 0) {
757                 lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
758                 if (lock_ctx->key.dptr == NULL) {
759                         DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
760                         talloc_free(lock_ctx);
761                         talloc_free(request);
762                         return NULL;
763                 }
764                 lock_ctx->key_hash = ctdb_hash(&key);
765         } else {
766                 lock_ctx->key.dptr = NULL;
767         }
768         lock_ctx->priority = priority;
769         lock_ctx->auto_mark = auto_mark;
770
771         lock_ctx->request = request;
772         lock_ctx->child = -1;
773
774         /* Non-record locks are required by recovery and should be scheduled
775          * immediately, so keep them at the head of the pending queue.
776          */
777         if (lock_ctx->type == LOCK_RECORD) {
778                 DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx);
779         } else {
780                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx);
781         }
782         CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
783         if (ctdb_db) {
784                 CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
785         }
786
787         /* Start the timer when we activate the context */
788         lock_ctx->start_time = timeval_current();
789
790         request->lctx = lock_ctx;
791         request->callback = callback;
792         request->private_data = private_data;
793
794         talloc_set_destructor(request, ctdb_lock_request_destructor);
795         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
796
797         ctdb_lock_schedule(ctdb);
798
799         return request;
800 }
801
802
803 /*
804  * obtain a lock on a record in a database
805  */
806 struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
807                                       struct ctdb_db_context *ctdb_db,
808                                       TDB_DATA key,
809                                       bool auto_mark,
810                                       void (*callback)(void *, bool),
811                                       void *private_data)
812 {
813         return ctdb_lock_internal(mem_ctx,
814                                   ctdb_db->ctdb,
815                                   ctdb_db,
816                                   key,
817                                   0,
818                                   callback,
819                                   private_data,
820                                   LOCK_RECORD,
821                                   auto_mark);
822 }
823
824
825 /*
826  * obtain a lock on a database
827  */
828 struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
829                                   struct ctdb_db_context *ctdb_db,
830                                   bool auto_mark,
831                                   void (*callback)(void *, bool),
832                                   void *private_data)
833 {
834         return ctdb_lock_internal(mem_ctx,
835                                   ctdb_db->ctdb,
836                                   ctdb_db,
837                                   tdb_null,
838                                   0,
839                                   callback,
840                                   private_data,
841                                   LOCK_DB,
842                                   auto_mark);
843 }