ctdb-daemon: Avoid the use of ctdb->freeze_mode variable
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "lib/tdb_wrap/tdb_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    execute an external script to debug.
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  *                           = false is used for freezing databases for
45  *                           recovery since the recovery cannot start till
46  *                           databases are locked on all the nodes.
47  *                           = true is used for record locks.
48  */
49
50 enum lock_type {
51         LOCK_RECORD,
52         LOCK_DB,
53         LOCK_ALLDB_PRIO,
54         LOCK_ALLDB,
55 };
56
57 static const char * const lock_type_str[] = {
58         "lock_record",
59         "lock_db",
60         "lock_alldb_prio",
61         "lock_alldb",
62 };
63
64 struct lock_request;
65
66 /* lock_context is the common part for a lock request */
67 struct lock_context {
68         struct lock_context *next, *prev;
69         enum lock_type type;
70         struct ctdb_context *ctdb;
71         struct ctdb_db_context *ctdb_db;
72         TDB_DATA key;
73         uint32_t priority;
74         bool auto_mark;
75         struct lock_request *request;
76         pid_t child;
77         int fd[2];
78         struct tevent_fd *tfd;
79         struct tevent_timer *ttimer;
80         struct timeval start_time;
81         uint32_t key_hash;
82         bool can_schedule;
83 };
84
85 /* lock_request is the client specific part for a lock request */
86 struct lock_request {
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 int ctdb_db_prio_iterator(struct ctdb_context *ctdb, uint32_t priority,
117                           ctdb_db_handler_t handler, void *private_data)
118 {
119         struct ctdb_db_context *ctdb_db;
120         int ret;
121
122         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
123                 if (ctdb_db->priority != priority) {
124                         continue;
125                 }
126                 if (later_db(ctdb, ctdb_db->db_name)) {
127                         continue;
128                 }
129                 ret = handler(ctdb_db, private_data);
130                 if (ret != 0) {
131                         return -1;
132                 }
133         }
134
135         /* If priority != 1, later_db check is not required and can return */
136         if (priority != 1) {
137                 return 0;
138         }
139
140         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
141                 if (!later_db(ctdb, ctdb_db->db_name)) {
142                         continue;
143                 }
144                 ret = handler(ctdb_db, private_data);
145                 if (ret != 0) {
146                         return -1;
147                 }
148         }
149
150         return 0;
151 }
152
153 int ctdb_db_iterator(struct ctdb_context *ctdb, ctdb_db_handler_t handler,
154                      void *private_data)
155 {
156         struct ctdb_db_context *ctdb_db;
157         int ret;
158
159         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
160                 ret = handler(ctdb_db, private_data);
161                 if (ret != 0) {
162                         return -1;
163                 }
164         }
165
166         return 0;
167 }
168
169 /*
170  * lock all databases - mark only
171  */
172 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db,
173                                 void *private_data)
174 {
175         int tdb_transaction_write_lock_mark(struct tdb_context *);
176
177         DEBUG(DEBUG_INFO, ("marking locked database %s\n", ctdb_db->db_name));
178
179         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
180                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
181                                   ctdb_db->db_name));
182                 return -1;
183         }
184
185         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
186                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
187                                   ctdb_db->db_name));
188                 return -1;
189         }
190
191         return 0;
192 }
193
194 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
195 {
196         /*
197          * This function is only used by the main dameon during recovery.
198          * At this stage, the databases have already been locked, by a
199          * dedicated child process.
200          */
201
202         if (!ctdb_db_prio_frozen(ctdb, priority)) {
203                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
204                 return -1;
205         }
206
207         return ctdb_db_prio_iterator(ctdb, priority, db_lock_mark_handler, NULL);
208 }
209
210 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
211 {
212         uint32_t priority;
213
214         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
215                 int ret;
216
217                 ret = ctdb_db_prio_iterator(ctdb, priority,
218                                             db_lock_mark_handler, NULL);
219                 if (ret != 0) {
220                         return -1;
221                 }
222         }
223
224         return 0;
225 }
226
227
228 /*
229  * lock all databases - unmark only
230  */
231 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db,
232                                   void *private_data)
233 {
234         int tdb_transaction_write_lock_unmark(struct tdb_context *);
235
236         DEBUG(DEBUG_INFO, ("unmarking locked database %s\n", ctdb_db->db_name));
237
238         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
239                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
240                                   ctdb_db->db_name));
241                 return -1;
242         }
243
244         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
245                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
246                                   ctdb_db->db_name));
247                 return -1;
248         }
249
250         return 0;
251 }
252
253 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
254 {
255         /*
256          * This function is only used by the main daemon during recovery.
257          * At this stage, the databases have already been locked, by a
258          * dedicated child process.
259          */
260
261         if (!ctdb_db_prio_frozen(ctdb, priority)) {
262                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
263                 return -1;
264         }
265
266         return ctdb_db_prio_iterator(ctdb, priority, db_lock_unmark_handler,
267                                      NULL);
268 }
269
270 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
271 {
272         uint32_t priority;
273
274         for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
275                 int ret;
276
277                 ret = ctdb_db_prio_iterator(ctdb, priority,
278                                             db_lock_unmark_handler, NULL);
279                 if (ret != 0) {
280                         return -1;
281                 }
282         }
283
284         return 0;
285 }
286
287
288 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
289
290 /*
291  * Destructor to kill the child locking process
292  */
293 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
294 {
295         if (lock_ctx->request) {
296                 lock_ctx->request->lctx = NULL;
297         }
298         if (lock_ctx->child > 0) {
299                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
300                 if (lock_ctx->type == LOCK_RECORD) {
301                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
302                 } else {
303                         DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
304                 }
305                 if (lock_ctx->ctdb_db) {
306                         lock_ctx->ctdb_db->lock_num_current--;
307                 }
308                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
309                 if (lock_ctx->ctdb_db) {
310                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
311                 }
312         } else {
313                 if (lock_ctx->type == LOCK_RECORD) {
314                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
315                 } else {
316                         DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
317                 }
318                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
319                 if (lock_ctx->ctdb_db) {
320                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
321                 }
322         }
323
324         ctdb_lock_schedule(lock_ctx->ctdb);
325
326         return 0;
327 }
328
329
330 /*
331  * Destructor to remove lock request
332  */
333 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
334 {
335         if (lock_request->lctx == NULL) {
336                 return 0;
337         }
338
339         lock_request->lctx->request = NULL;
340         TALLOC_FREE(lock_request->lctx);
341
342         return 0;
343 }
344
345 /*
346  * Process all the callbacks waiting for lock
347  *
348  * If lock has failed, callback is executed with locked=false
349  */
350 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
351 {
352         struct lock_request *request;
353         bool auto_mark = lock_ctx->auto_mark;
354
355         if (auto_mark && locked) {
356                 switch (lock_ctx->type) {
357                 case LOCK_RECORD:
358                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
359                         break;
360
361                 case LOCK_DB:
362                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
363                         break;
364
365                 case LOCK_ALLDB_PRIO:
366                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
367                         break;
368
369                 case LOCK_ALLDB:
370                         ctdb_lockall_mark(lock_ctx->ctdb);
371                         break;
372                 }
373         }
374
375         request = lock_ctx->request;
376         if (auto_mark) {
377                 /* Since request may be freed in the callback, unset the lock
378                  * context, so request destructor will not free lock context.
379                  */
380                 request->lctx = NULL;
381         }
382
383         /* Since request may be freed in the callback, unset the request */
384         lock_ctx->request = NULL;
385
386         request->callback(request->private_data, locked);
387
388         if (!auto_mark) {
389                 return;
390         }
391
392         if (locked) {
393                 switch (lock_ctx->type) {
394                 case LOCK_RECORD:
395                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
396                         break;
397
398                 case LOCK_DB:
399                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
400                         break;
401
402                 case LOCK_ALLDB_PRIO:
403                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
404                         break;
405
406                 case LOCK_ALLDB:
407                         ctdb_lockall_unmark(lock_ctx->ctdb);
408                         break;
409                 }
410         }
411
412         talloc_free(lock_ctx);
413 }
414
415
416 static int lock_bucket_id(double t)
417 {
418         double ms = 1.e-3, s = 1;
419         int id;
420
421         if (t < 1*ms) {
422                 id = 0;
423         } else if (t < 10*ms) {
424                 id = 1;
425         } else if (t < 100*ms) {
426                 id = 2;
427         } else if (t < 1*s) {
428                 id = 3;
429         } else if (t < 2*s) {
430                 id = 4;
431         } else if (t < 4*s) {
432                 id = 5;
433         } else if (t < 8*s) {
434                 id = 6;
435         } else if (t < 16*s) {
436                 id = 7;
437         } else if (t < 32*s) {
438                 id = 8;
439         } else if (t < 64*s) {
440                 id = 9;
441         } else {
442                 id = 10;
443         }
444
445         return id;
446 }
447
448 /*
449  * Callback routine when the required locks are obtained.
450  * Called from parent context
451  */
452 static void ctdb_lock_handler(struct tevent_context *ev,
453                             struct tevent_fd *tfd,
454                             uint16_t flags,
455                             void *private_data)
456 {
457         struct lock_context *lock_ctx;
458         char c;
459         bool locked;
460         double t;
461         int id;
462
463         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
464
465         /* cancel the timeout event */
466         TALLOC_FREE(lock_ctx->ttimer);
467
468         t = timeval_elapsed(&lock_ctx->start_time);
469         id = lock_bucket_id(t);
470
471         /* Read the status from the child process */
472         if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
473                 locked = false;
474         } else {
475                 locked = (c == 0 ? true : false);
476         }
477
478         /* Update statistics */
479         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
480         if (lock_ctx->ctdb_db) {
481                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
482         }
483
484         if (locked) {
485                 if (lock_ctx->ctdb_db) {
486                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
487                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
488                                             lock_type_str[lock_ctx->type], locks.latency,
489                                             lock_ctx->start_time);
490
491                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
492                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
493                 }
494         } else {
495                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
496                 if (lock_ctx->ctdb_db) {
497                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
498                 }
499         }
500
501         process_callbacks(lock_ctx, locked);
502 }
503
504
505 /*
506  * Callback routine when required locks are not obtained within timeout
507  * Called from parent context
508  */
509 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
510                                     struct tevent_timer *ttimer,
511                                     struct timeval current_time,
512                                     void *private_data)
513 {
514         static char debug_locks[PATH_MAX+1] = "";
515         struct lock_context *lock_ctx;
516         struct ctdb_context *ctdb;
517         pid_t pid;
518         double elapsed_time;
519         int new_timer;
520
521         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
522         ctdb = lock_ctx->ctdb;
523
524         /* If a node stopped/banned, don't spam the logs */
525         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
526                 lock_ctx->ttimer = NULL;
527                 return;
528         }
529
530         elapsed_time = timeval_elapsed(&lock_ctx->start_time);
531         if (lock_ctx->ctdb_db) {
532                 DEBUG(DEBUG_WARNING,
533                       ("Unable to get %s lock on database %s for %.0lf seconds\n",
534                        (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
535                        lock_ctx->ctdb_db->db_name, elapsed_time));
536         } else {
537                 DEBUG(DEBUG_WARNING,
538                       ("Unable to get ALLDB locks for %.0lf seconds\n",
539                        elapsed_time));
540         }
541
542         if (ctdb_set_helper("lock debugging helper",
543                             debug_locks, sizeof(debug_locks),
544                             "CTDB_DEBUG_LOCKS",
545                             getenv("CTDB_BASE"), "debug_locks.sh")) {
546                 pid = vfork();
547                 if (pid == 0) {
548                         execl(debug_locks, debug_locks, NULL);
549                         _exit(0);
550                 }
551                 ctdb_track_child(ctdb, pid);
552         } else {
553                 DEBUG(DEBUG_WARNING,
554                       (__location__
555                        " Unable to setup lock debugging\n"));
556         }
557
558         /* Back-off logging if lock is not obtained for a long time */
559         if (elapsed_time < 100.0) {
560                 new_timer = 10;
561         } else if (elapsed_time < 1000.0) {
562                 new_timer = 100;
563         } else {
564                 new_timer = 1000;
565         }
566
567         /* reset the timeout timer */
568         // talloc_free(lock_ctx->ttimer);
569         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
570                                             lock_ctx,
571                                             timeval_current_ofs(new_timer, 0),
572                                             ctdb_lock_timeout_handler,
573                                             (void *)lock_ctx);
574 }
575
576
577 static int db_count_handler(struct ctdb_db_context *ctdb_db, void *private_data)
578 {
579         int *count = (int *)private_data;
580
581         (*count) += 2;
582
583         return 0;
584 }
585
586 static int db_flags(struct ctdb_db_context *ctdb_db)
587 {
588         int tdb_flags = TDB_DEFAULT;
589
590 #ifdef TDB_MUTEX_LOCKING
591         if (!ctdb_db->persistent && ctdb_db->ctdb->tunable.mutex_enabled) {
592                 tdb_flags = (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
593         }
594 #endif
595         return tdb_flags;
596 }
597
598 struct db_namelist {
599         const char **names;
600         int n;
601 };
602
603 static int db_name_handler(struct ctdb_db_context *ctdb_db, void *private_data)
604 {
605         struct db_namelist *list = (struct db_namelist *)private_data;
606
607         list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
608         list->names[list->n+1] = talloc_asprintf(list->names, "0x%x",
609                                                  db_flags(ctdb_db));
610         list->n += 2;
611
612         return 0;
613 }
614
615 static bool lock_helper_args(TALLOC_CTX *mem_ctx,
616                              struct lock_context *lock_ctx, int fd,
617                              int *argc, const char ***argv)
618 {
619         struct ctdb_context *ctdb = lock_ctx->ctdb;
620         const char **args = NULL;
621         int nargs, i;
622         int priority;
623         struct db_namelist list;
624
625         switch (lock_ctx->type) {
626         case LOCK_RECORD:
627                 nargs = 6;
628                 break;
629
630         case LOCK_DB:
631                 nargs = 5;
632                 break;
633
634         case LOCK_ALLDB_PRIO:
635                 nargs = 3;
636                 ctdb_db_prio_iterator(ctdb, lock_ctx->priority,
637                                       db_count_handler, &nargs);
638                 break;
639
640         case LOCK_ALLDB:
641                 nargs = 3;
642                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
643                         ctdb_db_prio_iterator(ctdb, priority,
644                                               db_count_handler, &nargs);
645                 }
646                 break;
647         }
648
649         /* Add extra argument for null termination */
650         nargs++;
651
652         args = talloc_array(mem_ctx, const char *, nargs);
653         if (args == NULL) {
654                 return false;
655         }
656
657         args[0] = talloc_asprintf(args, "%d", getpid());
658         args[1] = talloc_asprintf(args, "%d", fd);
659
660         switch (lock_ctx->type) {
661         case LOCK_RECORD:
662                 args[2] = talloc_strdup(args, "RECORD");
663                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
664                 args[4] = talloc_asprintf(args, "0x%x",
665                                           db_flags(lock_ctx->ctdb_db));
666                 if (lock_ctx->key.dsize == 0) {
667                         args[5] = talloc_strdup(args, "NULL");
668                 } else {
669                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
670                 }
671                 break;
672
673         case LOCK_DB:
674                 args[2] = talloc_strdup(args, "DB");
675                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
676                 args[4] = talloc_asprintf(args, "0x%x",
677                                           db_flags(lock_ctx->ctdb_db));
678                 break;
679
680         case LOCK_ALLDB_PRIO:
681                 args[2] = talloc_strdup(args, "DB");
682                 list.names = args;
683                 list.n = 3;
684                 ctdb_db_prio_iterator(ctdb, lock_ctx->priority,
685                                       db_name_handler, &list);
686                 break;
687
688         case LOCK_ALLDB:
689                 args[2] = talloc_strdup(args, "DB");
690                 list.names = args;
691                 list.n = 3;
692                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
693                         ctdb_db_prio_iterator(ctdb, priority,
694                                               db_name_handler, &list);
695                 }
696                 break;
697         }
698
699         /* Make sure last argument is NULL */
700         args[nargs-1] = NULL;
701
702         for (i=0; i<nargs-1; i++) {
703                 if (args[i] == NULL) {
704                         talloc_free(args);
705                         return false;
706                 }
707         }
708
709         *argc = nargs;
710         *argv = args;
711         return true;
712 }
713
714 /*
715  * Find a lock request that can be scheduled
716  */
717 static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
718 {
719         struct lock_context *lock_ctx, *next_ctx;
720         struct ctdb_db_context *ctdb_db;
721
722         /* First check if there are database lock requests */
723
724         for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
725              lock_ctx = next_ctx) {
726
727                 if (lock_ctx->request != NULL) {
728                         /* Found a lock context with a request */
729                         return lock_ctx;
730                 }
731
732                 next_ctx = lock_ctx->next;
733
734                 DEBUG(DEBUG_INFO, ("Removing lock context without lock "
735                                    "request\n"));
736                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
737                 CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
738                 if (lock_ctx->ctdb_db) {
739                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
740                                                locks.num_pending);
741                 }
742                 talloc_free(lock_ctx);
743         }
744
745         /* Next check database queues */
746         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
747                 if (ctdb_db->lock_num_current ==
748                     ctdb->tunable.lock_processes_per_db) {
749                         continue;
750                 }
751
752                 for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
753                      lock_ctx = next_ctx) {
754
755                         next_ctx = lock_ctx->next;
756
757                         if (lock_ctx->request != NULL) {
758                                 return lock_ctx;
759                         }
760
761                         DEBUG(DEBUG_INFO, ("Removing lock context without "
762                                            "lock request\n"));
763                         DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
764                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
765                         CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
766                         talloc_free(lock_ctx);
767                 }
768         }
769
770         return NULL;
771 }
772
773 /*
774  * Schedule a new lock child process
775  * Set up callback handler and timeout handler
776  */
777 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
778 {
779         struct lock_context *lock_ctx;
780         int ret, argc;
781         TALLOC_CTX *tmp_ctx;
782         static char prog[PATH_MAX+1] = "";
783         const char **args;
784
785         if (!ctdb_set_helper("lock helper",
786                              prog, sizeof(prog),
787                              "CTDB_LOCK_HELPER",
788                              CTDB_HELPER_BINDIR, "ctdb_lock_helper")) {
789                 ctdb_die(ctdb, __location__
790                          " Unable to set lock helper\n");
791         }
792
793         /* Find a lock context with requests */
794         lock_ctx = ctdb_find_lock_context(ctdb);
795         if (lock_ctx == NULL) {
796                 return;
797         }
798
799         lock_ctx->child = -1;
800         ret = pipe(lock_ctx->fd);
801         if (ret != 0) {
802                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
803                 return;
804         }
805
806         set_close_on_exec(lock_ctx->fd[0]);
807
808         /* Create data for child process */
809         tmp_ctx = talloc_new(lock_ctx);
810         if (tmp_ctx == NULL) {
811                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
812                 close(lock_ctx->fd[0]);
813                 close(lock_ctx->fd[1]);
814                 return;
815         }
816
817         /* Create arguments for lock helper */
818         if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
819                               &argc, &args)) {
820                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
821                 close(lock_ctx->fd[0]);
822                 close(lock_ctx->fd[1]);
823                 talloc_free(tmp_ctx);
824                 return;
825         }
826
827         if (!ctdb_vfork_with_logging(lock_ctx, ctdb, "lock_helper",
828                                      prog, argc, (const char **)args,
829                                      NULL, NULL, &lock_ctx->child)) {
830                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
831                 close(lock_ctx->fd[0]);
832                 close(lock_ctx->fd[1]);
833                 talloc_free(tmp_ctx);
834                 return;
835         }
836
837         /* Parent process */
838         close(lock_ctx->fd[1]);
839
840         talloc_free(tmp_ctx);
841
842         /* Set up timeout handler */
843         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
844                                             lock_ctx,
845                                             timeval_current_ofs(10, 0),
846                                             ctdb_lock_timeout_handler,
847                                             (void *)lock_ctx);
848         if (lock_ctx->ttimer == NULL) {
849                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
850                 lock_ctx->child = -1;
851                 close(lock_ctx->fd[0]);
852                 return;
853         }
854
855         /* Set up callback */
856         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
857                                       lock_ctx,
858                                       lock_ctx->fd[0],
859                                       EVENT_FD_READ,
860                                       ctdb_lock_handler,
861                                       (void *)lock_ctx);
862         if (lock_ctx->tfd == NULL) {
863                 TALLOC_FREE(lock_ctx->ttimer);
864                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
865                 lock_ctx->child = -1;
866                 close(lock_ctx->fd[0]);
867                 return;
868         }
869         tevent_fd_set_auto_close(lock_ctx->tfd);
870
871         /* Move the context from pending to current */
872         if (lock_ctx->type == LOCK_RECORD) {
873                 DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
874                 DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx, NULL);
875         } else {
876                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
877                 DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
878         }
879         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
880         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
881         if (lock_ctx->ctdb_db) {
882                 lock_ctx->ctdb_db->lock_num_current++;
883                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
884                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
885         }
886 }
887
888
889 /*
890  * Lock record / db depending on type
891  */
892 static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
893                                                struct ctdb_context *ctdb,
894                                                struct ctdb_db_context *ctdb_db,
895                                                TDB_DATA key,
896                                                uint32_t priority,
897                                                void (*callback)(void *, bool),
898                                                void *private_data,
899                                                enum lock_type type,
900                                                bool auto_mark)
901 {
902         struct lock_context *lock_ctx = NULL;
903         struct lock_request *request;
904
905         if (callback == NULL) {
906                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
907                 return NULL;
908         }
909
910         lock_ctx = talloc_zero(ctdb, struct lock_context);
911         if (lock_ctx == NULL) {
912                 DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
913                 return NULL;
914         }
915
916         if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
917                 talloc_free(lock_ctx);
918                 return NULL;
919         }
920
921         lock_ctx->type = type;
922         lock_ctx->ctdb = ctdb;
923         lock_ctx->ctdb_db = ctdb_db;
924         lock_ctx->key.dsize = key.dsize;
925         if (key.dsize > 0) {
926                 lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
927                 if (lock_ctx->key.dptr == NULL) {
928                         DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
929                         talloc_free(lock_ctx);
930                         talloc_free(request);
931                         return NULL;
932                 }
933                 lock_ctx->key_hash = ctdb_hash(&key);
934         } else {
935                 lock_ctx->key.dptr = NULL;
936         }
937         lock_ctx->priority = priority;
938         lock_ctx->auto_mark = auto_mark;
939
940         lock_ctx->request = request;
941         lock_ctx->child = -1;
942
943         /* Non-record locks are required by recovery and should be scheduled
944          * immediately, so keep them at the head of the pending queue.
945          */
946         if (lock_ctx->type == LOCK_RECORD) {
947                 DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx, NULL);
948         } else {
949                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
950         }
951         CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
952         if (ctdb_db) {
953                 CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
954         }
955
956         /* Start the timer when we activate the context */
957         lock_ctx->start_time = timeval_current();
958
959         request->lctx = lock_ctx;
960         request->callback = callback;
961         request->private_data = private_data;
962
963         talloc_set_destructor(request, ctdb_lock_request_destructor);
964         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
965
966         ctdb_lock_schedule(ctdb);
967
968         return request;
969 }
970
971
972 /*
973  * obtain a lock on a record in a database
974  */
975 struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
976                                       struct ctdb_db_context *ctdb_db,
977                                       TDB_DATA key,
978                                       bool auto_mark,
979                                       void (*callback)(void *, bool),
980                                       void *private_data)
981 {
982         return ctdb_lock_internal(mem_ctx,
983                                   ctdb_db->ctdb,
984                                   ctdb_db,
985                                   key,
986                                   0,
987                                   callback,
988                                   private_data,
989                                   LOCK_RECORD,
990                                   auto_mark);
991 }
992
993
994 /*
995  * obtain a lock on a database
996  */
997 struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
998                                   struct ctdb_db_context *ctdb_db,
999                                   bool auto_mark,
1000                                   void (*callback)(void *, bool),
1001                                   void *private_data)
1002 {
1003         return ctdb_lock_internal(mem_ctx,
1004                                   ctdb_db->ctdb,
1005                                   ctdb_db,
1006                                   tdb_null,
1007                                   0,
1008                                   callback,
1009                                   private_data,
1010                                   LOCK_DB,
1011                                   auto_mark);
1012 }
1013
1014
1015 /*
1016  * obtain locks on all databases of specified priority
1017  */
1018 struct lock_request *ctdb_lock_alldb_prio(TALLOC_CTX *mem_ctx,
1019                                           struct ctdb_context *ctdb,
1020                                           uint32_t priority,
1021                                           bool auto_mark,
1022                                           void (*callback)(void *, bool),
1023                                           void *private_data)
1024 {
1025         if (priority < 1 || priority > NUM_DB_PRIORITIES) {
1026                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
1027                 return NULL;
1028         }
1029
1030         return ctdb_lock_internal(mem_ctx,
1031                                   ctdb,
1032                                   NULL,
1033                                   tdb_null,
1034                                   priority,
1035                                   callback,
1036                                   private_data,
1037                                   LOCK_ALLDB_PRIO,
1038                                   auto_mark);
1039 }
1040
1041
1042 /*
1043  * obtain locks on all databases
1044  */
1045 struct lock_request *ctdb_lock_alldb(TALLOC_CTX *mem_ctx,
1046                                      struct ctdb_context *ctdb,
1047                                      bool auto_mark,
1048                                      void (*callback)(void *, bool),
1049                                      void *private_data)
1050 {
1051         return ctdb_lock_internal(mem_ctx,
1052                                   ctdb,
1053                                   NULL,
1054                                   tdb_null,
1055                                   0,
1056                                   callback,
1057                                   private_data,
1058                                   LOCK_ALLDB,
1059                                   auto_mark);
1060 }
1061