ctdb-locking: Add a comment to explain auto_mark usage
[samba.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "lib/tdb_wrap/tdb_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    execute an external script to debug.
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  *                           = false is used for freezing databases for
45  *                           recovery since the recovery cannot start till
46  *                           databases are locked on all the nodes.
47  *                           = true is used for record locks.
48  */
49
50 enum lock_type {
51         LOCK_RECORD,
52         LOCK_DB,
53         LOCK_ALLDB_PRIO,
54         LOCK_ALLDB,
55 };
56
57 static const char * const lock_type_str[] = {
58         "lock_record",
59         "lock_db",
60         "lock_alldb_prio",
61         "lock_alldb",
62 };
63
64 struct lock_request;
65
66 /* lock_context is the common part for a lock request */
67 struct lock_context {
68         struct lock_context *next, *prev;
69         enum lock_type type;
70         struct ctdb_context *ctdb;
71         struct ctdb_db_context *ctdb_db;
72         TDB_DATA key;
73         uint32_t priority;
74         bool auto_mark;
75         struct lock_request *request;
76         pid_t child;
77         int fd[2];
78         struct tevent_fd *tfd;
79         struct tevent_timer *ttimer;
80         struct timeval start_time;
81         uint32_t key_hash;
82         bool can_schedule;
83 };
84
85 /* lock_request is the client specific part for a lock request */
86 struct lock_request {
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
117                             uint32_t priority,
118                             void *private_data);
119
120 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
121                             db_handler_t handler, void *private_data)
122 {
123         struct ctdb_db_context *ctdb_db;
124         int ret;
125
126         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
127                 if (ctdb_db->priority != priority) {
128                         continue;
129                 }
130                 if (later_db(ctdb, ctdb_db->db_name)) {
131                         continue;
132                 }
133                 ret = handler(ctdb_db, priority, private_data);
134                 if (ret != 0) {
135                         return -1;
136                 }
137         }
138
139         /* If priority != 1, later_db check is not required and can return */
140         if (priority != 1) {
141                 return 0;
142         }
143
144         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
145                 if (!later_db(ctdb, ctdb_db->db_name)) {
146                         continue;
147                 }
148                 ret = handler(ctdb_db, priority, private_data);
149                 if (ret != 0) {
150                         return -1;
151                 }
152         }
153
154         return 0;
155 }
156
157
158 /*
159  * lock all databases - mark only
160  */
161 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
162                                 void *private_data)
163 {
164         int tdb_transaction_write_lock_mark(struct tdb_context *);
165
166         DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
167                            ctdb_db->db_name, priority));
168
169         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
170                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
171                                   ctdb_db->db_name));
172                 return -1;
173         }
174
175         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
176                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
177                                   ctdb_db->db_name));
178                 return -1;
179         }
180
181         return 0;
182 }
183
184 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
185 {
186         /*
187          * This function is only used by the main dameon during recovery.
188          * At this stage, the databases have already been locked, by a
189          * dedicated child process. The freeze_mode variable is used to track
190          * whether the actual locks are held by the child process or not.
191          */
192
193         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
194                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
195                 return -1;
196         }
197
198         return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
199 }
200
201 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
202 {
203         uint32_t priority;
204
205         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
206                 if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
207                         return -1;
208                 }
209         }
210
211         return 0;
212 }
213
214
215 /*
216  * lock all databases - unmark only
217  */
218 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
219                                   void *private_data)
220 {
221         int tdb_transaction_write_lock_unmark(struct tdb_context *);
222
223         DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
224                            ctdb_db->db_name, priority));
225
226         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
227                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
228                                   ctdb_db->db_name));
229                 return -1;
230         }
231
232         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
233                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
234                                   ctdb_db->db_name));
235                 return -1;
236         }
237
238         return 0;
239 }
240
241 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
242 {
243         /*
244          * This function is only used by the main daemon during recovery.
245          * At this stage, the databases have already been locked, by a
246          * dedicated child process. The freeze_mode variable is used to track
247          * whether the actual locks are held by the child process or not.
248          */
249
250         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
251                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
252                 return -1;
253         }
254
255         return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
256 }
257
258 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
259 {
260         uint32_t priority;
261
262         for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
263                 if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
264                         return -1;
265                 }
266         }
267
268         return 0;
269 }
270
271
272 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
273
274 /*
275  * Destructor to kill the child locking process
276  */
277 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
278 {
279         if (lock_ctx->request) {
280                 lock_ctx->request->lctx = NULL;
281         }
282         if (lock_ctx->child > 0) {
283                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
284                 if (lock_ctx->type == LOCK_RECORD) {
285                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
286                 } else {
287                         DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
288                 }
289                 if (lock_ctx->ctdb_db) {
290                         lock_ctx->ctdb_db->lock_num_current--;
291                 }
292                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
293                 if (lock_ctx->ctdb_db) {
294                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
295                 }
296         } else {
297                 if (lock_ctx->type == LOCK_RECORD) {
298                         DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
299                 } else {
300                         DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
301                 }
302                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
303                 if (lock_ctx->ctdb_db) {
304                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
305                 }
306         }
307
308         ctdb_lock_schedule(lock_ctx->ctdb);
309
310         return 0;
311 }
312
313
314 /*
315  * Destructor to remove lock request
316  */
317 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
318 {
319         if (lock_request->lctx == NULL) {
320                 return 0;
321         }
322
323         lock_request->lctx->request = NULL;
324         TALLOC_FREE(lock_request->lctx);
325
326         return 0;
327 }
328
329 /*
330  * Process all the callbacks waiting for lock
331  *
332  * If lock has failed, callback is executed with locked=false
333  */
334 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
335 {
336         struct lock_request *request;
337
338         if (lock_ctx->auto_mark && locked) {
339                 switch (lock_ctx->type) {
340                 case LOCK_RECORD:
341                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
342                         break;
343
344                 case LOCK_DB:
345                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
346                         break;
347
348                 case LOCK_ALLDB_PRIO:
349                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
350                         break;
351
352                 case LOCK_ALLDB:
353                         ctdb_lockall_mark(lock_ctx->ctdb);
354                         break;
355                 }
356         }
357
358         request = lock_ctx->request;
359         if (lock_ctx->auto_mark) {
360                 /* Since request may be freed in the callback, unset the lock
361                  * context, so request destructor will not free lock context.
362                  */
363                 request->lctx = NULL;
364         }
365
366         /* Since request may be freed in the callback, unset the request */
367         lock_ctx->request = NULL;
368
369         request->callback(request->private_data, locked);
370
371         if (lock_ctx->auto_mark && locked) {
372                 switch (lock_ctx->type) {
373                 case LOCK_RECORD:
374                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
375                         break;
376
377                 case LOCK_DB:
378                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
379                         break;
380
381                 case LOCK_ALLDB_PRIO:
382                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
383                         break;
384
385                 case LOCK_ALLDB:
386                         ctdb_lockall_unmark(lock_ctx->ctdb);
387                         break;
388                 }
389         }
390 }
391
392
393 static int lock_bucket_id(double t)
394 {
395         double ms = 1.e-3, s = 1;
396         int id;
397
398         if (t < 1*ms) {
399                 id = 0;
400         } else if (t < 10*ms) {
401                 id = 1;
402         } else if (t < 100*ms) {
403                 id = 2;
404         } else if (t < 1*s) {
405                 id = 3;
406         } else if (t < 2*s) {
407                 id = 4;
408         } else if (t < 4*s) {
409                 id = 5;
410         } else if (t < 8*s) {
411                 id = 6;
412         } else if (t < 16*s) {
413                 id = 7;
414         } else if (t < 32*s) {
415                 id = 8;
416         } else if (t < 64*s) {
417                 id = 9;
418         } else {
419                 id = 10;
420         }
421
422         return id;
423 }
424
425 /*
426  * Callback routine when the required locks are obtained.
427  * Called from parent context
428  */
429 static void ctdb_lock_handler(struct tevent_context *ev,
430                             struct tevent_fd *tfd,
431                             uint16_t flags,
432                             void *private_data)
433 {
434         struct lock_context *lock_ctx;
435         TALLOC_CTX *tmp_ctx = NULL;
436         char c;
437         bool locked;
438         double t;
439         int id;
440
441         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
442
443         /* cancel the timeout event */
444         TALLOC_FREE(lock_ctx->ttimer);
445
446         t = timeval_elapsed(&lock_ctx->start_time);
447         id = lock_bucket_id(t);
448
449         if (lock_ctx->auto_mark) {
450                 tmp_ctx = talloc_new(ev);
451                 talloc_steal(tmp_ctx, lock_ctx);
452         }
453
454         /* Read the status from the child process */
455         if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
456                 locked = false;
457         } else {
458                 locked = (c == 0 ? true : false);
459         }
460
461         /* Update statistics */
462         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
463         if (lock_ctx->ctdb_db) {
464                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
465         }
466
467         if (locked) {
468                 if (lock_ctx->ctdb_db) {
469                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
470                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
471                                             lock_type_str[lock_ctx->type], locks.latency,
472                                             lock_ctx->start_time);
473
474                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
475                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
476                 }
477         } else {
478                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
479                 if (lock_ctx->ctdb_db) {
480                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
481                 }
482         }
483
484         process_callbacks(lock_ctx, locked);
485
486         if (lock_ctx->auto_mark) {
487                 talloc_free(tmp_ctx);
488         }
489 }
490
491
492 /*
493  * Callback routine when required locks are not obtained within timeout
494  * Called from parent context
495  */
496 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
497                                     struct tevent_timer *ttimer,
498                                     struct timeval current_time,
499                                     void *private_data)
500 {
501         static char debug_locks[PATH_MAX+1] = "";
502         struct lock_context *lock_ctx;
503         struct ctdb_context *ctdb;
504         pid_t pid;
505         double elapsed_time;
506         int new_timer;
507
508         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
509         ctdb = lock_ctx->ctdb;
510
511         /* If a node stopped/banned, don't spam the logs */
512         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
513                 lock_ctx->ttimer = NULL;
514                 return;
515         }
516
517         elapsed_time = timeval_elapsed(&lock_ctx->start_time);
518         if (lock_ctx->ctdb_db) {
519                 DEBUG(DEBUG_WARNING,
520                       ("Unable to get %s lock on database %s for %.0lf seconds\n",
521                        (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
522                        lock_ctx->ctdb_db->db_name, elapsed_time));
523         } else {
524                 DEBUG(DEBUG_WARNING,
525                       ("Unable to get ALLDB locks for %.0lf seconds\n",
526                        elapsed_time));
527         }
528
529         if (ctdb_set_helper("lock debugging helper",
530                             debug_locks, sizeof(debug_locks),
531                             "CTDB_DEBUG_LOCKS",
532                             getenv("CTDB_BASE"), "debug_locks.sh")) {
533                 pid = vfork();
534                 if (pid == 0) {
535                         execl(debug_locks, debug_locks, NULL);
536                         _exit(0);
537                 }
538                 ctdb_track_child(ctdb, pid);
539         } else {
540                 DEBUG(DEBUG_WARNING,
541                       (__location__
542                        " Unable to setup lock debugging\n"));
543         }
544
545         /* Back-off logging if lock is not obtained for a long time */
546         if (elapsed_time < 100.0) {
547                 new_timer = 10;
548         } else if (elapsed_time < 1000.0) {
549                 new_timer = 100;
550         } else {
551                 new_timer = 1000;
552         }
553
554         /* reset the timeout timer */
555         // talloc_free(lock_ctx->ttimer);
556         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
557                                             lock_ctx,
558                                             timeval_current_ofs(new_timer, 0),
559                                             ctdb_lock_timeout_handler,
560                                             (void *)lock_ctx);
561 }
562
563
564 static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
565                             void *private_data)
566 {
567         int *count = (int *)private_data;
568
569         (*count) += 2;
570
571         return 0;
572 }
573
574 static int db_flags(struct ctdb_db_context *ctdb_db)
575 {
576         int tdb_flags = TDB_DEFAULT;
577
578 #ifdef TDB_MUTEX_LOCKING
579         if (!ctdb_db->persistent && ctdb_db->ctdb->tunable.mutex_enabled) {
580                 tdb_flags = (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
581         }
582 #endif
583         return tdb_flags;
584 }
585
586 struct db_namelist {
587         const char **names;
588         int n;
589 };
590
591 static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
592                            void *private_data)
593 {
594         struct db_namelist *list = (struct db_namelist *)private_data;
595
596         list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
597         list->names[list->n+1] = talloc_asprintf(list->names, "0x%x",
598                                                  db_flags(ctdb_db));
599         list->n += 2;
600
601         return 0;
602 }
603
604 static bool lock_helper_args(TALLOC_CTX *mem_ctx,
605                              struct lock_context *lock_ctx, int fd,
606                              int *argc, const char ***argv)
607 {
608         struct ctdb_context *ctdb = lock_ctx->ctdb;
609         const char **args = NULL;
610         int nargs, i;
611         int priority;
612         struct db_namelist list;
613
614         switch (lock_ctx->type) {
615         case LOCK_RECORD:
616                 nargs = 6;
617                 break;
618
619         case LOCK_DB:
620                 nargs = 5;
621                 break;
622
623         case LOCK_ALLDB_PRIO:
624                 nargs = 3;
625                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
626                 break;
627
628         case LOCK_ALLDB:
629                 nargs = 3;
630                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
631                         ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
632                 }
633                 break;
634         }
635
636         /* Add extra argument for null termination */
637         nargs++;
638
639         args = talloc_array(mem_ctx, const char *, nargs);
640         if (args == NULL) {
641                 return false;
642         }
643
644         args[0] = talloc_asprintf(args, "%d", getpid());
645         args[1] = talloc_asprintf(args, "%d", fd);
646
647         switch (lock_ctx->type) {
648         case LOCK_RECORD:
649                 args[2] = talloc_strdup(args, "RECORD");
650                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
651                 args[4] = talloc_asprintf(args, "0x%x",
652                                           db_flags(lock_ctx->ctdb_db));
653                 if (lock_ctx->key.dsize == 0) {
654                         args[5] = talloc_strdup(args, "NULL");
655                 } else {
656                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
657                 }
658                 break;
659
660         case LOCK_DB:
661                 args[2] = talloc_strdup(args, "DB");
662                 args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
663                 args[4] = talloc_asprintf(args, "0x%x",
664                                           db_flags(lock_ctx->ctdb_db));
665                 break;
666
667         case LOCK_ALLDB_PRIO:
668                 args[2] = talloc_strdup(args, "DB");
669                 list.names = args;
670                 list.n = 3;
671                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
672                 break;
673
674         case LOCK_ALLDB:
675                 args[2] = talloc_strdup(args, "DB");
676                 list.names = args;
677                 list.n = 3;
678                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
679                         ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
680                 }
681                 break;
682         }
683
684         /* Make sure last argument is NULL */
685         args[nargs-1] = NULL;
686
687         for (i=0; i<nargs-1; i++) {
688                 if (args[i] == NULL) {
689                         talloc_free(args);
690                         return false;
691                 }
692         }
693
694         *argc = nargs;
695         *argv = args;
696         return true;
697 }
698
699 /*
700  * Find a lock request that can be scheduled
701  */
702 static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
703 {
704         struct lock_context *lock_ctx, *next_ctx;
705         struct ctdb_db_context *ctdb_db;
706
707         /* First check if there are database lock requests */
708
709         for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
710              lock_ctx = next_ctx) {
711
712                 if (lock_ctx->request != NULL) {
713                         /* Found a lock context with a request */
714                         return lock_ctx;
715                 }
716
717                 next_ctx = lock_ctx->next;
718
719                 DEBUG(DEBUG_INFO, ("Removing lock context without lock "
720                                    "request\n"));
721                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
722                 CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
723                 if (lock_ctx->ctdb_db) {
724                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
725                                                locks.num_pending);
726                 }
727                 talloc_free(lock_ctx);
728         }
729
730         /* Next check database queues */
731         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
732                 if (ctdb_db->lock_num_current ==
733                     ctdb->tunable.lock_processes_per_db) {
734                         continue;
735                 }
736
737                 for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
738                      lock_ctx = next_ctx) {
739
740                         next_ctx = lock_ctx->next;
741
742                         if (lock_ctx->request != NULL) {
743                                 return lock_ctx;
744                         }
745
746                         DEBUG(DEBUG_INFO, ("Removing lock context without "
747                                            "lock request\n"));
748                         DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
749                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
750                         CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
751                         talloc_free(lock_ctx);
752                 }
753         }
754
755         return NULL;
756 }
757
758 /*
759  * Schedule a new lock child process
760  * Set up callback handler and timeout handler
761  */
762 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
763 {
764         struct lock_context *lock_ctx;
765         int ret, argc;
766         TALLOC_CTX *tmp_ctx;
767         static char prog[PATH_MAX+1] = "";
768         const char **args;
769
770         if (!ctdb_set_helper("lock helper",
771                              prog, sizeof(prog),
772                              "CTDB_LOCK_HELPER",
773                              CTDB_HELPER_BINDIR, "ctdb_lock_helper")) {
774                 ctdb_die(ctdb, __location__
775                          " Unable to set lock helper\n");
776         }
777
778         /* Find a lock context with requests */
779         lock_ctx = ctdb_find_lock_context(ctdb);
780         if (lock_ctx == NULL) {
781                 return;
782         }
783
784         lock_ctx->child = -1;
785         ret = pipe(lock_ctx->fd);
786         if (ret != 0) {
787                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
788                 return;
789         }
790
791         set_close_on_exec(lock_ctx->fd[0]);
792
793         /* Create data for child process */
794         tmp_ctx = talloc_new(lock_ctx);
795         if (tmp_ctx == NULL) {
796                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
797                 close(lock_ctx->fd[0]);
798                 close(lock_ctx->fd[1]);
799                 return;
800         }
801
802         /* Create arguments for lock helper */
803         if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
804                               &argc, &args)) {
805                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
806                 close(lock_ctx->fd[0]);
807                 close(lock_ctx->fd[1]);
808                 talloc_free(tmp_ctx);
809                 return;
810         }
811
812         if (!ctdb_vfork_with_logging(lock_ctx, ctdb, "lock_helper",
813                                      prog, argc, (const char **)args,
814                                      NULL, NULL, &lock_ctx->child)) {
815                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
816                 close(lock_ctx->fd[0]);
817                 close(lock_ctx->fd[1]);
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         /* Parent process */
823         close(lock_ctx->fd[1]);
824
825         talloc_free(tmp_ctx);
826
827         /* Set up timeout handler */
828         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
829                                             lock_ctx,
830                                             timeval_current_ofs(10, 0),
831                                             ctdb_lock_timeout_handler,
832                                             (void *)lock_ctx);
833         if (lock_ctx->ttimer == NULL) {
834                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
835                 lock_ctx->child = -1;
836                 close(lock_ctx->fd[0]);
837                 return;
838         }
839
840         /* Set up callback */
841         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
842                                       lock_ctx,
843                                       lock_ctx->fd[0],
844                                       EVENT_FD_READ,
845                                       ctdb_lock_handler,
846                                       (void *)lock_ctx);
847         if (lock_ctx->tfd == NULL) {
848                 TALLOC_FREE(lock_ctx->ttimer);
849                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
850                 lock_ctx->child = -1;
851                 close(lock_ctx->fd[0]);
852                 return;
853         }
854         tevent_fd_set_auto_close(lock_ctx->tfd);
855
856         /* Move the context from pending to current */
857         if (lock_ctx->type == LOCK_RECORD) {
858                 DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
859                 DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx, NULL);
860         } else {
861                 DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
862                 DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
863         }
864         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
865         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
866         if (lock_ctx->ctdb_db) {
867                 lock_ctx->ctdb_db->lock_num_current++;
868                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
869                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
870         }
871 }
872
873
874 /*
875  * Lock record / db depending on type
876  */
877 static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
878                                                struct ctdb_context *ctdb,
879                                                struct ctdb_db_context *ctdb_db,
880                                                TDB_DATA key,
881                                                uint32_t priority,
882                                                void (*callback)(void *, bool),
883                                                void *private_data,
884                                                enum lock_type type,
885                                                bool auto_mark)
886 {
887         struct lock_context *lock_ctx = NULL;
888         struct lock_request *request;
889
890         if (callback == NULL) {
891                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
892                 return NULL;
893         }
894
895         lock_ctx = talloc_zero(ctdb, struct lock_context);
896         if (lock_ctx == NULL) {
897                 DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
898                 return NULL;
899         }
900
901         if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
902                 talloc_free(lock_ctx);
903                 return NULL;
904         }
905
906         lock_ctx->type = type;
907         lock_ctx->ctdb = ctdb;
908         lock_ctx->ctdb_db = ctdb_db;
909         lock_ctx->key.dsize = key.dsize;
910         if (key.dsize > 0) {
911                 lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
912                 if (lock_ctx->key.dptr == NULL) {
913                         DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
914                         talloc_free(lock_ctx);
915                         talloc_free(request);
916                         return NULL;
917                 }
918                 lock_ctx->key_hash = ctdb_hash(&key);
919         } else {
920                 lock_ctx->key.dptr = NULL;
921         }
922         lock_ctx->priority = priority;
923         lock_ctx->auto_mark = auto_mark;
924
925         lock_ctx->request = request;
926         lock_ctx->child = -1;
927
928         /* Non-record locks are required by recovery and should be scheduled
929          * immediately, so keep them at the head of the pending queue.
930          */
931         if (lock_ctx->type == LOCK_RECORD) {
932                 DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx, NULL);
933         } else {
934                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
935         }
936         CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
937         if (ctdb_db) {
938                 CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
939         }
940
941         /* Start the timer when we activate the context */
942         lock_ctx->start_time = timeval_current();
943
944         request->lctx = lock_ctx;
945         request->callback = callback;
946         request->private_data = private_data;
947
948         talloc_set_destructor(request, ctdb_lock_request_destructor);
949         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
950
951         ctdb_lock_schedule(ctdb);
952
953         return request;
954 }
955
956
957 /*
958  * obtain a lock on a record in a database
959  */
960 struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
961                                       struct ctdb_db_context *ctdb_db,
962                                       TDB_DATA key,
963                                       bool auto_mark,
964                                       void (*callback)(void *, bool),
965                                       void *private_data)
966 {
967         return ctdb_lock_internal(mem_ctx,
968                                   ctdb_db->ctdb,
969                                   ctdb_db,
970                                   key,
971                                   0,
972                                   callback,
973                                   private_data,
974                                   LOCK_RECORD,
975                                   auto_mark);
976 }
977
978
979 /*
980  * obtain a lock on a database
981  */
982 struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
983                                   struct ctdb_db_context *ctdb_db,
984                                   bool auto_mark,
985                                   void (*callback)(void *, bool),
986                                   void *private_data)
987 {
988         return ctdb_lock_internal(mem_ctx,
989                                   ctdb_db->ctdb,
990                                   ctdb_db,
991                                   tdb_null,
992                                   0,
993                                   callback,
994                                   private_data,
995                                   LOCK_DB,
996                                   auto_mark);
997 }
998
999
1000 /*
1001  * obtain locks on all databases of specified priority
1002  */
1003 struct lock_request *ctdb_lock_alldb_prio(TALLOC_CTX *mem_ctx,
1004                                           struct ctdb_context *ctdb,
1005                                           uint32_t priority,
1006                                           bool auto_mark,
1007                                           void (*callback)(void *, bool),
1008                                           void *private_data)
1009 {
1010         if (priority < 1 || priority > NUM_DB_PRIORITIES) {
1011                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
1012                 return NULL;
1013         }
1014
1015         return ctdb_lock_internal(mem_ctx,
1016                                   ctdb,
1017                                   NULL,
1018                                   tdb_null,
1019                                   priority,
1020                                   callback,
1021                                   private_data,
1022                                   LOCK_ALLDB_PRIO,
1023                                   auto_mark);
1024 }
1025
1026
1027 /*
1028  * obtain locks on all databases
1029  */
1030 struct lock_request *ctdb_lock_alldb(TALLOC_CTX *mem_ctx,
1031                                      struct ctdb_context *ctdb,
1032                                      bool auto_mark,
1033                                      void (*callback)(void *, bool),
1034                                      void *private_data)
1035 {
1036         return ctdb_lock_internal(mem_ctx,
1037                                   ctdb,
1038                                   NULL,
1039                                   tdb_null,
1040                                   0,
1041                                   callback,
1042                                   private_data,
1043                                   LOCK_ALLDB,
1044                                   auto_mark);
1045 }
1046