ctdb-locking: make process_callbacks() more robust
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "lib/tdb_wrap/tdb_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    execute an external script to debug.
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  *                           = false is used for freezing databases for
45  *                           recovery since the recovery cannot start till
46  *                           databases are locked on all the nodes.
47  *                           = true is used for record locks.
48  */
49
50 enum lock_type {
51         LOCK_RECORD,
52         LOCK_DB,
53         LOCK_ALLDB_PRIO,
54         LOCK_ALLDB,
55 };
56
57 static const char * const lock_type_str[] = {
58         "lock_record",
59         "lock_db",
60         "lock_alldb_prio",
61         "lock_alldb",
62 };
63
64 struct lock_request;
65
66 /* lock_context is the common part for a lock request */
67 struct lock_context {
68         struct lock_context *next, *prev;
69         enum lock_type type;
70         struct ctdb_context *ctdb;
71         struct ctdb_db_context *ctdb_db;
72         TDB_DATA key;
73         uint32_t priority;
74         bool auto_mark;
75         struct lock_request *request;
76         pid_t child;
77         int fd[2];
78         struct tevent_fd *tfd;
79         struct tevent_timer *ttimer;
80         struct timeval start_time;
81         uint32_t key_hash;
82         bool can_schedule;
83 };
84
85 /* lock_request is the client specific part for a lock request */
86 struct lock_request {
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
117                             uint32_t priority,
118                             void *private_data);
119
120 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
121                             db_handler_t handler, void *private_data)
122 {
123         struct ctdb_db_context *ctdb_db;
124         int ret;
125
126         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
127                 if (ctdb_db->priority != priority) {
128                         continue;
129                 }
130                 if (later_db(ctdb, ctdb_db->db_name)) {
131                         continue;
132                 }
133                 ret = handler(ctdb_db, priority, private_data);
134                 if (ret != 0) {
135                         return -1;
136                 }
137         }
138
139         /* If priority != 1, later_db check is not required and can return */
140         if (priority != 1) {
141                 return 0;
142         }
143
144         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
145                 if (!later_db(ctdb, ctdb_db->db_name)) {
146                         continue;
147                 }
148                 ret = handler(ctdb_db, priority, private_data);
149                 if (ret != 0) {
150                         return -1;
151                 }
152         }
153
154         return 0;
155 }
156
157
158 /*
159  * lock all databases - mark only
160  */
161 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
162                                 void *private_data)
163 {
164         int tdb_transaction_write_lock_mark(struct tdb_context *);
165
166         DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
167                            ctdb_db->db_name, priority));
168
169         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
170                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
171                                   ctdb_db->db_name));
172                 return -1;
173         }
174
175         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
176                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
177                                   ctdb_db->db_name));
178                 return -1;
179         }
180
181         return 0;
182 }
183
184 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
185 {
186         /*
187          * This function is only used by the main dameon during recovery.
188          * At this stage, the databases have already been locked, by a
189          * dedicated child process. The freeze_mode variable is used to track
190          * whether the actual locks are held by the child process or not.
191          */
192
193         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
194                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
195                 return -1;
196         }
197
198         return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
199 }
200
201 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
202 {
203         uint32_t priority;
204
205         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
206                 if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
207                         return -1;
208                 }
209         }
210
211         return 0;
212 }
213
214
215 /*
216  * lock all databases - unmark only
217  */
218 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
219                                   void *private_data)
220 {
221         int tdb_transaction_write_lock_unmark(struct tdb_context *);
222
223         DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
224                            ctdb_db->db_name, priority));
225
226         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
227                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
228                                   ctdb_db->db_name));
229                 return -1;
230         }
231
232         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
233                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
234                                   ctdb_db->db_name));
235                 return -1;
236         }
237
238         return 0;
239 }
240
241 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
242 {
243         /*
244          * This function is only used by the main daemon during recovery.
245          * At this stage, the databases have already been locked, by a
246          * dedicated child process. The freeze_mode variable is used to track
247          * whether the actual locks are held by the child process or not.
248          */
249
250         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
251                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
252                 return -1;
253         }
254
255         return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
256 }
257
258 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
259 {
260         uint32_t priority;
261
262         for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
263                 if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
264                         return -1;
265                 }
266         }
267
268         return 0;
269 }
270
271
272 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
273
274 /*
275  * Destructor to kill the child locking process
276  */
277 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
278 {
279         if (lock_ctx->request) {
280                 lock_ctx->request->lctx = NULL;
281         }
282         if (lock_ctx->child > 0) {
283                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
284                 if (lock_ctx->type == LOCK_RECORD) {
285                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
286                 } else {
287                         DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
288                 }
289                 if (lock_ctx->ctdb_db) {
290                         lock_ctx->ctdb_db->lock_num_current--;
291                 }
292                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
293                 if (lock_ctx->ctdb_db) {
294                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
295                 }
296         } else {
297                 if (lock_ctx->type == LOCK_RECORD) {
298                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
299                 } else {
300                         DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
301                 }
302                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
303                 if (lock_ctx->ctdb_db) {
304                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
305                 }
306         }
307
308         ctdb_lock_schedule(lock_ctx->ctdb);
309
310         return 0;
311 }
312
313
314 /*
315  * Destructor to remove lock request
316  */
317 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
318 {
319         if (lock_request->lctx == NULL) {
320                 return 0;
321         }
322
323         lock_request->lctx->request = NULL;
324         TALLOC_FREE(lock_request->lctx);
325
326         return 0;
327 }
328
329 /*
330  * Process all the callbacks waiting for lock
331  *
332  * If lock has failed, callback is executed with locked=false
333  */
334 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
335 {
336         struct lock_request *request;
337         bool auto_mark = lock_ctx->auto_mark;
338
339         if (auto_mark && locked) {
340                 switch (lock_ctx->type) {
341                 case LOCK_RECORD:
342                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
343                         break;
344
345                 case LOCK_DB:
346                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
347                         break;
348
349                 case LOCK_ALLDB_PRIO:
350                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
351                         break;
352
353                 case LOCK_ALLDB:
354                         ctdb_lockall_mark(lock_ctx->ctdb);
355                         break;
356                 }
357         }
358
359         request = lock_ctx->request;
360         if (auto_mark) {
361                 /* Since request may be freed in the callback, unset the lock
362                  * context, so request destructor will not free lock context.
363                  */
364                 request->lctx = NULL;
365         }
366
367         /* Since request may be freed in the callback, unset the request */
368         lock_ctx->request = NULL;
369
370         request->callback(request->private_data, locked);
371
372         if (!auto_mark) {
373                 return;
374         }
375
376         if (locked) {
377                 switch (lock_ctx->type) {
378                 case LOCK_RECORD:
379                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
380                         break;
381
382                 case LOCK_DB:
383                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
384                         break;
385
386                 case LOCK_ALLDB_PRIO:
387                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
388                         break;
389
390                 case LOCK_ALLDB:
391                         ctdb_lockall_unmark(lock_ctx->ctdb);
392                         break;
393                 }
394         }
395 }
396
397
398 static int lock_bucket_id(double t)
399 {
400         double ms = 1.e-3, s = 1;
401         int id;
402
403         if (t < 1*ms) {
404                 id = 0;
405         } else if (t < 10*ms) {
406                 id = 1;
407         } else if (t < 100*ms) {
408                 id = 2;
409         } else if (t < 1*s) {
410                 id = 3;
411         } else if (t < 2*s) {
412                 id = 4;
413         } else if (t < 4*s) {
414                 id = 5;
415         } else if (t < 8*s) {
416                 id = 6;
417         } else if (t < 16*s) {
418                 id = 7;
419         } else if (t < 32*s) {
420                 id = 8;
421         } else if (t < 64*s) {
422                 id = 9;
423         } else {
424                 id = 10;
425         }
426
427         return id;
428 }
429
430 /*
431  * Callback routine when the required locks are obtained.
432  * Called from parent context
433  */
434 static void ctdb_lock_handler(struct tevent_context *ev,
435                             struct tevent_fd *tfd,
436                             uint16_t flags,
437                             void *private_data)
438 {
439         struct lock_context *lock_ctx;
440         TALLOC_CTX *tmp_ctx = NULL;
441         char c;
442         bool locked;
443         double t;
444         int id;
445
446         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
447
448         /* cancel the timeout event */
449         TALLOC_FREE(lock_ctx->ttimer);
450
451         t = timeval_elapsed(&lock_ctx->start_time);
452         id = lock_bucket_id(t);
453
454         if (lock_ctx->auto_mark) {
455                 tmp_ctx = talloc_new(ev);
456                 talloc_steal(tmp_ctx, lock_ctx);
457         }
458
459         /* Read the status from the child process */
460         if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
461                 locked = false;
462         } else {
463                 locked = (c == 0 ? true : false);
464         }
465
466         /* Update statistics */
467         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
468         if (lock_ctx->ctdb_db) {
469                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
470         }
471
472         if (locked) {
473                 if (lock_ctx->ctdb_db) {
474                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
475                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
476                                             lock_type_str[lock_ctx->type], locks.latency,
477                                             lock_ctx->start_time);
478
479                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
480                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
481                 }
482         } else {
483                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
484                 if (lock_ctx->ctdb_db) {
485                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
486                 }
487         }
488
489         process_callbacks(lock_ctx, locked);
490
491         if (lock_ctx->auto_mark) {
492                 talloc_free(tmp_ctx);
493         }
494 }
495
496
497 /*
498  * Callback routine when required locks are not obtained within timeout
499  * Called from parent context
500  */
501 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
502                                     struct tevent_timer *ttimer,
503                                     struct timeval current_time,
504                                     void *private_data)
505 {
506         static char debug_locks[PATH_MAX+1] = "";
507         struct lock_context *lock_ctx;
508         struct ctdb_context *ctdb;
509         pid_t pid;
510         double elapsed_time;
511         int new_timer;
512
513         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
514         ctdb = lock_ctx->ctdb;
515
516         /* If a node stopped/banned, don't spam the logs */
517         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
518                 lock_ctx->ttimer = NULL;
519                 return;
520         }
521
522         elapsed_time = timeval_elapsed(&lock_ctx->start_time);
523         if (lock_ctx->ctdb_db) {
524                 DEBUG(DEBUG_WARNING,
525                       ("Unable to get %s lock on database %s for %.0lf seconds\n",
526                        (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
527                        lock_ctx->ctdb_db->db_name, elapsed_time));
528         } else {
529                 DEBUG(DEBUG_WARNING,
530                       ("Unable to get ALLDB locks for %.0lf seconds\n",
531                        elapsed_time));
532         }
533
534         if (ctdb_set_helper("lock debugging helper",
535                             debug_locks, sizeof(debug_locks),
536                             "CTDB_DEBUG_LOCKS",
537                             getenv("CTDB_BASE"), "debug_locks.sh")) {
538                 pid = vfork();
539                 if (pid == 0) {
540                         execl(debug_locks, debug_locks, NULL);
541                         _exit(0);
542                 }
543                 ctdb_track_child(ctdb, pid);
544         } else {
545                 DEBUG(DEBUG_WARNING,
546                       (__location__
547                        " Unable to setup lock debugging\n"));
548         }
549
550         /* Back-off logging if lock is not obtained for a long time */
551         if (elapsed_time < 100.0) {
552                 new_timer = 10;
553         } else if (elapsed_time < 1000.0) {
554                 new_timer = 100;
555         } else {
556                 new_timer = 1000;
557         }
558
559         /* reset the timeout timer */
560         // talloc_free(lock_ctx->ttimer);
561         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
562                                             lock_ctx,
563                                             timeval_current_ofs(new_timer, 0),
564                                             ctdb_lock_timeout_handler,
565                                             (void *)lock_ctx);
566 }
567
568
569 static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
570                             void *private_data)
571 {
572         int *count = (int *)private_data;
573
574         (*count) += 2;
575
576         return 0;
577 }
578
579 static int db_flags(struct ctdb_db_context *ctdb_db)
580 {
581         int tdb_flags = TDB_DEFAULT;
582
583 #ifdef TDB_MUTEX_LOCKING
584         if (!ctdb_db->persistent && ctdb_db->ctdb->tunable.mutex_enabled) {
585                 tdb_flags = (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
586         }
587 #endif
588         return tdb_flags;
589 }
590
591 struct db_namelist {
592         const char **names;
593         int n;
594 };
595
596 static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
597                            void *private_data)
598 {
599         struct db_namelist *list = (struct db_namelist *)private_data;
600
601         list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
602         list->names[list->n+1] = talloc_asprintf(list->names, "0x%x",
603                                                  db_flags(ctdb_db));
604         list->n += 2;
605
606         return 0;
607 }
608
609 static bool lock_helper_args(TALLOC_CTX *mem_ctx,
610                              struct lock_context *lock_ctx, int fd,
611                              int *argc, const char ***argv)
612 {
613         struct ctdb_context *ctdb = lock_ctx->ctdb;
614         const char **args = NULL;
615         int nargs, i;
616         int priority;
617         struct db_namelist list;
618
619         switch (lock_ctx->type) {
620         case LOCK_RECORD:
621                 nargs = 6;
622                 break;
623
624         case LOCK_DB:
625                 nargs = 5;
626                 break;
627
628         case LOCK_ALLDB_PRIO:
629                 nargs = 3;
630                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
631                 break;
632
633         case LOCK_ALLDB:
634                 nargs = 3;
635                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
636                         ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
637                 }
638                 break;
639         }
640
641         /* Add extra argument for null termination */
642         nargs++;
643
644         args = talloc_array(mem_ctx, const char *, nargs);
645         if (args == NULL) {
646                 return false;
647         }
648
649         args[0] = talloc_asprintf(args, "%d", getpid());
650         args[1] = talloc_asprintf(args, "%d", fd);
651
652         switch (lock_ctx->type) {
653         case LOCK_RECORD:
654                 args[2] = talloc_strdup(args, "RECORD");
655                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
656                 args[4] = talloc_asprintf(args, "0x%x",
657                                           db_flags(lock_ctx->ctdb_db));
658                 if (lock_ctx->key.dsize == 0) {
659                         args[5] = talloc_strdup(args, "NULL");
660                 } else {
661                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
662                 }
663                 break;
664
665         case LOCK_DB:
666                 args[2] = talloc_strdup(args, "DB");
667                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
668                 args[4] = talloc_asprintf(args, "0x%x",
669                                           db_flags(lock_ctx->ctdb_db));
670                 break;
671
672         case LOCK_ALLDB_PRIO:
673                 args[2] = talloc_strdup(args, "DB");
674                 list.names = args;
675                 list.n = 3;
676                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
677                 break;
678
679         case LOCK_ALLDB:
680                 args[2] = talloc_strdup(args, "DB");
681                 list.names = args;
682                 list.n = 3;
683                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
684                         ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
685                 }
686                 break;
687         }
688
689         /* Make sure last argument is NULL */
690         args[nargs-1] = NULL;
691
692         for (i=0; i<nargs-1; i++) {
693                 if (args[i] == NULL) {
694                         talloc_free(args);
695                         return false;
696                 }
697         }
698
699         *argc = nargs;
700         *argv = args;
701         return true;
702 }
703
704 /*
705  * Find a lock request that can be scheduled
706  */
707 static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
708 {
709         struct lock_context *lock_ctx, *next_ctx;
710         struct ctdb_db_context *ctdb_db;
711
712         /* First check if there are database lock requests */
713
714         for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
715              lock_ctx = next_ctx) {
716
717                 if (lock_ctx->request != NULL) {
718                         /* Found a lock context with a request */
719                         return lock_ctx;
720                 }
721
722                 next_ctx = lock_ctx->next;
723
724                 DEBUG(DEBUG_INFO, ("Removing lock context without lock "
725                                    "request\n"));
726                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
727                 CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
728                 if (lock_ctx->ctdb_db) {
729                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
730                                                locks.num_pending);
731                 }
732                 talloc_free(lock_ctx);
733         }
734
735         /* Next check database queues */
736         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
737                 if (ctdb_db->lock_num_current ==
738                     ctdb->tunable.lock_processes_per_db) {
739                         continue;
740                 }
741
742                 for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
743                      lock_ctx = next_ctx) {
744
745                         next_ctx = lock_ctx->next;
746
747                         if (lock_ctx->request != NULL) {
748                                 return lock_ctx;
749                         }
750
751                         DEBUG(DEBUG_INFO, ("Removing lock context without "
752                                            "lock request\n"));
753                         DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
754                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
755                         CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
756                         talloc_free(lock_ctx);
757                 }
758         }
759
760         return NULL;
761 }
762
763 /*
764  * Schedule a new lock child process
765  * Set up callback handler and timeout handler
766  */
767 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
768 {
769         struct lock_context *lock_ctx;
770         int ret, argc;
771         TALLOC_CTX *tmp_ctx;
772         static char prog[PATH_MAX+1] = "";
773         const char **args;
774
775         if (!ctdb_set_helper("lock helper",
776                              prog, sizeof(prog),
777                              "CTDB_LOCK_HELPER",
778                              CTDB_HELPER_BINDIR, "ctdb_lock_helper")) {
779                 ctdb_die(ctdb, __location__
780                          " Unable to set lock helper\n");
781         }
782
783         /* Find a lock context with requests */
784         lock_ctx = ctdb_find_lock_context(ctdb);
785         if (lock_ctx == NULL) {
786                 return;
787         }
788
789         lock_ctx->child = -1;
790         ret = pipe(lock_ctx->fd);
791         if (ret != 0) {
792                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
793                 return;
794         }
795
796         set_close_on_exec(lock_ctx->fd[0]);
797
798         /* Create data for child process */
799         tmp_ctx = talloc_new(lock_ctx);
800         if (tmp_ctx == NULL) {
801                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
802                 close(lock_ctx->fd[0]);
803                 close(lock_ctx->fd[1]);
804                 return;
805         }
806
807         /* Create arguments for lock helper */
808         if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
809                               &argc, &args)) {
810                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
811                 close(lock_ctx->fd[0]);
812                 close(lock_ctx->fd[1]);
813                 talloc_free(tmp_ctx);
814                 return;
815         }
816
817         if (!ctdb_vfork_with_logging(lock_ctx, ctdb, "lock_helper",
818                                      prog, argc, (const char **)args,
819                                      NULL, NULL, &lock_ctx->child)) {
820                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
821                 close(lock_ctx->fd[0]);
822                 close(lock_ctx->fd[1]);
823                 talloc_free(tmp_ctx);
824                 return;
825         }
826
827         /* Parent process */
828         close(lock_ctx->fd[1]);
829
830         talloc_free(tmp_ctx);
831
832         /* Set up timeout handler */
833         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
834                                             lock_ctx,
835                                             timeval_current_ofs(10, 0),
836                                             ctdb_lock_timeout_handler,
837                                             (void *)lock_ctx);
838         if (lock_ctx->ttimer == NULL) {
839                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
840                 lock_ctx->child = -1;
841                 close(lock_ctx->fd[0]);
842                 return;
843         }
844
845         /* Set up callback */
846         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
847                                       lock_ctx,
848                                       lock_ctx->fd[0],
849                                       EVENT_FD_READ,
850                                       ctdb_lock_handler,
851                                       (void *)lock_ctx);
852         if (lock_ctx->tfd == NULL) {
853                 TALLOC_FREE(lock_ctx->ttimer);
854                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
855                 lock_ctx->child = -1;
856                 close(lock_ctx->fd[0]);
857                 return;
858         }
859         tevent_fd_set_auto_close(lock_ctx->tfd);
860
861         /* Move the context from pending to current */
862         if (lock_ctx->type == LOCK_RECORD) {
863                 DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
864                 DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx, NULL);
865         } else {
866                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
867                 DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
868         }
869         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
870         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
871         if (lock_ctx->ctdb_db) {
872                 lock_ctx->ctdb_db->lock_num_current++;
873                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
874                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
875         }
876 }
877
878
879 /*
880  * Lock record / db depending on type
881  */
882 static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
883                                                struct ctdb_context *ctdb,
884                                                struct ctdb_db_context *ctdb_db,
885                                                TDB_DATA key,
886                                                uint32_t priority,
887                                                void (*callback)(void *, bool),
888                                                void *private_data,
889                                                enum lock_type type,
890                                                bool auto_mark)
891 {
892         struct lock_context *lock_ctx = NULL;
893         struct lock_request *request;
894
895         if (callback == NULL) {
896                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
897                 return NULL;
898         }
899
900         lock_ctx = talloc_zero(ctdb, struct lock_context);
901         if (lock_ctx == NULL) {
902                 DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
903                 return NULL;
904         }
905
906         if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
907                 talloc_free(lock_ctx);
908                 return NULL;
909         }
910
911         lock_ctx->type = type;
912         lock_ctx->ctdb = ctdb;
913         lock_ctx->ctdb_db = ctdb_db;
914         lock_ctx->key.dsize = key.dsize;
915         if (key.dsize > 0) {
916                 lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
917                 if (lock_ctx->key.dptr == NULL) {
918                         DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
919                         talloc_free(lock_ctx);
920                         talloc_free(request);
921                         return NULL;
922                 }
923                 lock_ctx->key_hash = ctdb_hash(&key);
924         } else {
925                 lock_ctx->key.dptr = NULL;
926         }
927         lock_ctx->priority = priority;
928         lock_ctx->auto_mark = auto_mark;
929
930         lock_ctx->request = request;
931         lock_ctx->child = -1;
932
933         /* Non-record locks are required by recovery and should be scheduled
934          * immediately, so keep them at the head of the pending queue.
935          */
936         if (lock_ctx->type == LOCK_RECORD) {
937                 DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx, NULL);
938         } else {
939                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
940         }
941         CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
942         if (ctdb_db) {
943                 CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
944         }
945
946         /* Start the timer when we activate the context */
947         lock_ctx->start_time = timeval_current();
948
949         request->lctx = lock_ctx;
950         request->callback = callback;
951         request->private_data = private_data;
952
953         talloc_set_destructor(request, ctdb_lock_request_destructor);
954         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
955
956         ctdb_lock_schedule(ctdb);
957
958         return request;
959 }
960
961
962 /*
963  * obtain a lock on a record in a database
964  */
965 struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
966                                       struct ctdb_db_context *ctdb_db,
967                                       TDB_DATA key,
968                                       bool auto_mark,
969                                       void (*callback)(void *, bool),
970                                       void *private_data)
971 {
972         return ctdb_lock_internal(mem_ctx,
973                                   ctdb_db->ctdb,
974                                   ctdb_db,
975                                   key,
976                                   0,
977                                   callback,
978                                   private_data,
979                                   LOCK_RECORD,
980                                   auto_mark);
981 }
982
983
984 /*
985  * obtain a lock on a database
986  */
987 struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
988                                   struct ctdb_db_context *ctdb_db,
989                                   bool auto_mark,
990                                   void (*callback)(void *, bool),
991                                   void *private_data)
992 {
993         return ctdb_lock_internal(mem_ctx,
994                                   ctdb_db->ctdb,
995                                   ctdb_db,
996                                   tdb_null,
997                                   0,
998                                   callback,
999                                   private_data,
1000                                   LOCK_DB,
1001                                   auto_mark);
1002 }
1003
1004
1005 /*
1006  * obtain locks on all databases of specified priority
1007  */
1008 struct lock_request *ctdb_lock_alldb_prio(TALLOC_CTX *mem_ctx,
1009                                           struct ctdb_context *ctdb,
1010                                           uint32_t priority,
1011                                           bool auto_mark,
1012                                           void (*callback)(void *, bool),
1013                                           void *private_data)
1014 {
1015         if (priority < 1 || priority > NUM_DB_PRIORITIES) {
1016                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
1017                 return NULL;
1018         }
1019
1020         return ctdb_lock_internal(mem_ctx,
1021                                   ctdb,
1022                                   NULL,
1023                                   tdb_null,
1024                                   priority,
1025                                   callback,
1026                                   private_data,
1027                                   LOCK_ALLDB_PRIO,
1028                                   auto_mark);
1029 }
1030
1031
1032 /*
1033  * obtain locks on all databases
1034  */
1035 struct lock_request *ctdb_lock_alldb(TALLOC_CTX *mem_ctx,
1036                                      struct ctdb_context *ctdb,
1037                                      bool auto_mark,
1038                                      void (*callback)(void *, bool),
1039                                      void *private_data)
1040 {
1041         return ctdb_lock_internal(mem_ctx,
1042                                   ctdb,
1043                                   NULL,
1044                                   tdb_null,
1045                                   0,
1046                                   callback,
1047                                   private_data,
1048                                   LOCK_ALLDB,
1049                                   auto_mark);
1050 }
1051