ctdb-locking: Update current lock statistics when lock is scheduled
[samba.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    diagnose using /proc/locks and log warning message
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  */
45
46 /* FIXME: Add a tunable max_lock_processes_per_db */
47 #define MAX_LOCK_PROCESSES_PER_DB               (100)
48
49 enum lock_type {
50         LOCK_RECORD,
51         LOCK_DB,
52         LOCK_ALLDB_PRIO,
53         LOCK_ALLDB,
54 };
55
56 static const char * const lock_type_str[] = {
57         "lock_record",
58         "lock_db",
59         "lock_alldb_prio",
60         "lock_db",
61 };
62
63 struct lock_request;
64
65 /* lock_context is the common part for a lock request */
66 struct lock_context {
67         struct lock_context *next, *prev;
68         enum lock_type type;
69         struct ctdb_context *ctdb;
70         struct ctdb_db_context *ctdb_db;
71         TDB_DATA key;
72         uint32_t priority;
73         bool auto_mark;
74         struct lock_request *req_queue;
75         pid_t child;
76         int fd[2];
77         struct tevent_fd *tfd;
78         struct tevent_timer *ttimer;
79         pid_t block_child;
80         int block_fd[2];
81         struct timeval start_time;
82 };
83
84 /* lock_request is the client specific part for a lock request */
85 struct lock_request {
86         struct lock_request *next, *prev;
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
117                             uint32_t priority,
118                             void *private_data);
119
120 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
121                             db_handler_t handler, void *private_data)
122 {
123         struct ctdb_db_context *ctdb_db;
124         int ret;
125
126         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
127                 if (ctdb_db->priority != priority) {
128                         continue;
129                 }
130                 if (later_db(ctdb, ctdb_db->db_name)) {
131                         continue;
132                 }
133                 ret = handler(ctdb_db, priority, private_data);
134                 if (ret != 0) {
135                         return -1;
136                 }
137         }
138
139         /* If priority != 1, later_db check is not required and can return */
140         if (priority != 1) {
141                 return 0;
142         }
143
144         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
145                 if (!later_db(ctdb, ctdb_db->db_name)) {
146                         continue;
147                 }
148                 ret = handler(ctdb_db, priority, private_data);
149                 if (ret != 0) {
150                         return -1;
151                 }
152         }
153
154         return 0;
155 }
156
157
158 /*
159  * lock all databases - mark only
160  */
161 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
162                                 void *private_data)
163 {
164         int tdb_transaction_write_lock_mark(struct tdb_context *);
165
166         DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
167                            ctdb_db->db_name, priority));
168
169         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
170                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
171                                   ctdb_db->db_name));
172                 return -1;
173         }
174
175         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
176                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
177                                   ctdb_db->db_name));
178                 return -1;
179         }
180
181         return 0;
182 }
183
184 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
185 {
186         /*
187          * This function is only used by the main dameon during recovery.
188          * At this stage, the databases have already been locked, by a
189          * dedicated child process. The freeze_mode variable is used to track
190          * whether the actual locks are held by the child process or not.
191          */
192
193         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
194                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
195                 return -1;
196         }
197
198         return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
199 }
200
201 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
202 {
203         uint32_t priority;
204
205         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
206                 if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
207                         return -1;
208                 }
209         }
210
211         return 0;
212 }
213
214
215 /*
216  * lock all databases - unmark only
217  */
218 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
219                                   void *private_data)
220 {
221         int tdb_transaction_write_lock_unmark(struct tdb_context *);
222
223         DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
224                            ctdb_db->db_name, priority));
225
226         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
227                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
228                                   ctdb_db->db_name));
229                 return -1;
230         }
231
232         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
233                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
234                                   ctdb_db->db_name));
235                 return -1;
236         }
237
238         return 0;
239 }
240
241 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
242 {
243         /*
244          * This function is only used by the main dameon during recovery.
245          * At this stage, the databases have already been locked, by a
246          * dedicated child process. The freeze_mode variable is used to track
247          * whether the actual locks are held by the child process or not.
248          */
249
250         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
251                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
252                 return -1;
253         }
254
255         return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
256 }
257
258 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
259 {
260         uint32_t priority;
261
262         for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
263                 if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
264                         return -1;
265                 }
266         }
267
268         return 0;
269 }
270
271
272 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
273
274 /*
275  * Destructor to kill the child locking process
276  */
277 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
278 {
279         if (lock_ctx->child > 0) {
280                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
281                 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
282                 if (lock_ctx->ctdb_db) {
283                         lock_ctx->ctdb_db->lock_num_current--;
284                 }
285                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
286                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
287                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
288                 }
289         } else {
290                 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
291                 lock_ctx->ctdb->lock_num_pending--;
292                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
293                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
294                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
295                 }
296         }
297
298         ctdb_lock_schedule(lock_ctx->ctdb);
299
300         return 0;
301 }
302
303
304 /*
305  * Destructor to remove lock request
306  */
307 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
308 {
309         DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
310         return 0;
311 }
312
313
314 void ctdb_lock_free_request_context(struct lock_request *lock_req)
315 {
316         struct lock_context *lock_ctx;
317
318         lock_ctx = lock_req->lctx;
319         talloc_free(lock_req);
320         talloc_free(lock_ctx);
321 }
322
323
324 /*
325  * Process all the callbacks waiting for lock
326  *
327  * If lock has failed, callback is executed with locked=false
328  */
329 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
330 {
331         struct lock_request *request, *next;
332
333         if (lock_ctx->auto_mark && locked) {
334                 switch (lock_ctx->type) {
335                 case LOCK_RECORD:
336                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
337                         break;
338
339                 case LOCK_DB:
340                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
341                         break;
342
343                 case LOCK_ALLDB_PRIO:
344                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
345                         break;
346
347                 case LOCK_ALLDB:
348                         ctdb_lockall_mark(lock_ctx->ctdb);
349                         break;
350                 }
351         }
352
353         /* Iterate through all callbacks */
354         request = lock_ctx->req_queue;
355         while (request) {
356                 if (lock_ctx->auto_mark) {
357                         /* Reset the destructor, so request is not removed from the list */
358                         talloc_set_destructor(request, NULL);
359                 }
360
361                 /* In case, callback frees the request, store next */
362                 next = request->next;
363                 request->callback(request->private_data, locked);
364                 request = next;
365         }
366
367         if (lock_ctx->auto_mark && locked) {
368                 switch (lock_ctx->type) {
369                 case LOCK_RECORD:
370                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
371                         break;
372
373                 case LOCK_DB:
374                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
375                         break;
376
377                 case LOCK_ALLDB_PRIO:
378                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
379                         break;
380
381                 case LOCK_ALLDB:
382                         ctdb_lockall_unmark(lock_ctx->ctdb);
383                         break;
384                 }
385         }
386 }
387
388
389 static int lock_bucket_id(double t)
390 {
391         double ms = 1.e-3, s = 1;
392         int id;
393
394         if (t < 1*ms) {
395                 id = 0;
396         } else if (t < 10*ms) {
397                 id = 1;
398         } else if (t < 100*ms) {
399                 id = 2;
400         } else if (t < 1*s) {
401                 id = 3;
402         } else if (t < 2*s) {
403                 id = 4;
404         } else if (t < 4*s) {
405                 id = 5;
406         } else if (t < 8*s) {
407                 id = 6;
408         } else if (t < 16*s) {
409                 id = 7;
410         } else if (t < 32*s) {
411                 id = 8;
412         } else if (t < 64*s) {
413                 id = 9;
414         } else {
415                 id = 10;
416         }
417
418         return id;
419 }
420
421 /*
422  * Callback routine when the required locks are obtained.
423  * Called from parent context
424  */
425 static void ctdb_lock_handler(struct tevent_context *ev,
426                             struct tevent_fd *tfd,
427                             uint16_t flags,
428                             void *private_data)
429 {
430         struct lock_context *lock_ctx;
431         TALLOC_CTX *tmp_ctx = NULL;
432         char c;
433         bool locked;
434         double t;
435         int id;
436
437         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
438
439         /* cancel the timeout event */
440         if (lock_ctx->ttimer) {
441                 TALLOC_FREE(lock_ctx->ttimer);
442         }
443
444         t = timeval_elapsed(&lock_ctx->start_time);
445         id = lock_bucket_id(t);
446
447         if (lock_ctx->auto_mark) {
448                 tmp_ctx = talloc_new(ev);
449                 talloc_steal(tmp_ctx, lock_ctx);
450         }
451
452         /* Read the status from the child process */
453         if (read(lock_ctx->fd[0], &c, 1) != 1) {
454                 locked = false;
455         } else {
456                 locked = (c == 0 ? true : false);
457         }
458
459         /* Update statistics */
460         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
461         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
462         if (lock_ctx->ctdb_db) {
463                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
464                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
465         }
466
467         if (locked) {
468                 if (lock_ctx->ctdb_db) {
469                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
470                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
471                                             lock_type_str[lock_ctx->type], locks.latency,
472                                             lock_ctx->start_time);
473
474                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
475                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
476                 }
477         } else {
478                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
479                 if (lock_ctx->ctdb_db) {
480                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
481                 }
482         }
483
484         process_callbacks(lock_ctx, locked);
485
486         if (lock_ctx->auto_mark) {
487                 talloc_free(tmp_ctx);
488         }
489 }
490
491
492 /*
493  * Callback routine when required locks are not obtained within timeout
494  * Called from parent context
495  */
496 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
497                                     struct tevent_timer *ttimer,
498                                     struct timeval current_time,
499                                     void *private_data)
500 {
501         static const char * debug_locks = NULL;
502         struct lock_context *lock_ctx;
503         struct ctdb_context *ctdb;
504         pid_t pid;
505
506         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
507         ctdb = lock_ctx->ctdb;
508
509         if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
510                 DEBUG(DEBUG_WARNING,
511                       ("Unable to get %s lock on database %s for %.0lf seconds\n",
512                        (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
513                        lock_ctx->ctdb_db->db_name,
514                        timeval_elapsed(&lock_ctx->start_time)));
515         } else {
516                 DEBUG(DEBUG_WARNING,
517                       ("Unable to get ALLDB locks for %.0lf seconds\n",
518                        timeval_elapsed(&lock_ctx->start_time)));
519         }
520
521         /* Fire a child process to find the blocking process. */
522         if (debug_locks == NULL) {
523                 debug_locks = getenv("CTDB_DEBUG_LOCKS");
524                 if (debug_locks == NULL) {
525                         debug_locks = talloc_asprintf(ctdb,
526                                                       "%s/debug_locks.sh",
527                                                       getenv("CTDB_BASE"));
528                 }
529         }
530         if (debug_locks != NULL) {
531                 pid = fork();
532                 if (pid == 0) {
533                         execl(debug_locks, debug_locks, NULL);
534                 }
535         } else {
536                 DEBUG(DEBUG_WARNING,
537                       (__location__
538                        " Unable to setup lock debugging - no memory?\n"));
539         }
540
541         /* reset the timeout timer */
542         // talloc_free(lock_ctx->ttimer);
543         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
544                                             lock_ctx,
545                                             timeval_current_ofs(10, 0),
546                                             ctdb_lock_timeout_handler,
547                                             (void *)lock_ctx);
548 }
549
550
551 static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
552                             void *private_data)
553 {
554         int *count = (int *)private_data;
555
556         (*count)++;
557
558         return 0;
559 }
560
561 struct db_namelist {
562         char **names;
563         int n;
564 };
565
566 static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
567                            void *private_data)
568 {
569         struct db_namelist *list = (struct db_namelist *)private_data;
570
571         list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
572         list->n++;
573
574         return 0;
575 }
576
577 static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ctx, int fd)
578 {
579         struct ctdb_context *ctdb = lock_ctx->ctdb;
580         char **args = NULL;
581         int nargs, i;
582         int priority;
583         struct db_namelist list;
584
585         switch (lock_ctx->type) {
586         case LOCK_RECORD:
587                 nargs = 6;
588                 break;
589
590         case LOCK_DB:
591                 nargs = 5;
592                 break;
593
594         case LOCK_ALLDB_PRIO:
595                 nargs = 4;
596                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
597                 break;
598
599         case LOCK_ALLDB:
600                 nargs = 4;
601                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
602                         ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
603                 }
604                 break;
605         }
606
607         /* Add extra argument for null termination */
608         nargs++;
609
610         args = talloc_array(mem_ctx, char *, nargs);
611         if (args == NULL) {
612                 return NULL;
613         }
614
615         args[0] = talloc_strdup(args, "ctdb_lock_helper");
616         args[1] = talloc_asprintf(args, "%d", getpid());
617         args[2] = talloc_asprintf(args, "%d", fd);
618
619         switch (lock_ctx->type) {
620         case LOCK_RECORD:
621                 args[3] = talloc_strdup(args, "RECORD");
622                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
623                 if (lock_ctx->key.dsize == 0) {
624                         args[5] = talloc_strdup(args, "NULL");
625                 } else {
626                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
627                 }
628                 break;
629
630         case LOCK_DB:
631                 args[3] = talloc_strdup(args, "DB");
632                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
633                 break;
634
635         case LOCK_ALLDB_PRIO:
636                 args[3] = talloc_strdup(args, "DB");
637                 list.names = args;
638                 list.n = 4;
639                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
640                 break;
641
642         case LOCK_ALLDB:
643                 args[3] = talloc_strdup(args, "DB");
644                 list.names = args;
645                 list.n = 4;
646                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
647                         ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
648                 }
649                 break;
650         }
651
652         /* Make sure last argument is NULL */
653         args[nargs-1] = NULL;
654
655         for (i=0; i<nargs-1; i++) {
656                 if (args[i] == NULL) {
657                         talloc_free(args);
658                         return NULL;
659                 }
660         }
661
662         return args;
663 }
664
665
666 /*
667  * Find the lock context of a given type
668  */
669 static struct lock_context *find_lock_context(struct lock_context *lock_list,
670                                               struct ctdb_db_context *ctdb_db,
671                                               TDB_DATA key,
672                                               uint32_t priority,
673                                               enum lock_type type)
674 {
675         struct lock_context *lock_ctx;
676
677         /* Search active locks */
678         for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
679                 if (lock_ctx->type != type) {
680                         continue;
681                 }
682
683                 switch (lock_ctx->type) {
684                 case LOCK_RECORD:
685                         if (ctdb_db == lock_ctx->ctdb_db &&
686                             key.dsize == lock_ctx->key.dsize &&
687                             memcmp(key.dptr, lock_ctx->key.dptr, key.dsize) == 0) {
688                                 goto done;
689                         }
690                         break;
691
692                 case LOCK_DB:
693                         if (ctdb_db == lock_ctx->ctdb_db) {
694                                 goto done;
695                         }
696                         break;
697
698                 case LOCK_ALLDB_PRIO:
699                         if (priority == lock_ctx->priority) {
700                                 goto done;
701                         }
702                         break;
703
704                 case LOCK_ALLDB:
705                         goto done;
706                         break;
707                 }
708         }
709
710         /* Did not find the lock context we are searching for */
711         lock_ctx = NULL;
712
713 done:
714         return lock_ctx;
715
716 }
717
718
719 /*
720  * Schedule a new lock child process
721  * Set up callback handler and timeout handler
722  */
723 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
724 {
725         struct lock_context *lock_ctx, *next_ctx, *active_ctx;
726         int ret;
727         TALLOC_CTX *tmp_ctx;
728         const char *helper = BINDIR "/ctdb_lock_helper";
729         static const char *prog = NULL;
730         char **args;
731
732         if (prog == NULL) {
733                 const char *t;
734
735                 t = getenv("CTDB_LOCK_HELPER");
736                 if (t != NULL) {
737                         prog = talloc_strdup(ctdb, t);
738                 } else {
739                         prog = talloc_strdup(ctdb, helper);
740                 }
741                 CTDB_NO_MEMORY_VOID(ctdb, prog);
742         }
743
744         if (ctdb->lock_pending == NULL) {
745                 return;
746         }
747
748         /* Find a lock context with requests */
749         lock_ctx = ctdb->lock_pending;
750         while (lock_ctx != NULL) {
751                 next_ctx = lock_ctx->next;
752                 if (! lock_ctx->req_queue) {
753                         DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
754                         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
755                         ctdb->lock_num_pending--;
756                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
757                         if (lock_ctx->ctdb_db) {
758                                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
759                         }
760                         talloc_free(lock_ctx);
761                 } else {
762                         active_ctx = find_lock_context(ctdb->lock_current, lock_ctx->ctdb_db,
763                                                        lock_ctx->key, lock_ctx->priority,
764                                                        lock_ctx->type);
765                         if (active_ctx == NULL) {
766                                 if (lock_ctx->ctdb_db == NULL ||
767                                     lock_ctx->ctdb_db->lock_num_current < MAX_LOCK_PROCESSES_PER_DB) {
768                                         /* Found a lock context with lock requests */
769                                         break;
770                                 }
771                         }
772
773                         /* There is already a child waiting for the
774                          * same key.  So don't schedule another child
775                          * just yet.
776                          */
777                 }
778                 lock_ctx = next_ctx;
779         }
780
781         if (lock_ctx == NULL) {
782                 return;
783         }
784
785         lock_ctx->child = -1;
786         ret = pipe(lock_ctx->fd);
787         if (ret != 0) {
788                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
789                 return;
790         }
791
792         set_close_on_exec(lock_ctx->fd[0]);
793
794         /* Create data for child process */
795         tmp_ctx = talloc_new(lock_ctx);
796         if (tmp_ctx == NULL) {
797                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
798                 close(lock_ctx->fd[0]);
799                 close(lock_ctx->fd[1]);
800                 return;
801         }
802
803         /* Create arguments for lock helper */
804         args = lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1]);
805         if (args == NULL) {
806                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
807                 close(lock_ctx->fd[0]);
808                 close(lock_ctx->fd[1]);
809                 talloc_free(tmp_ctx);
810                 return;
811         }
812
813         lock_ctx->child = ctdb_fork(ctdb);
814
815         if (lock_ctx->child == (pid_t)-1) {
816                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
817                 close(lock_ctx->fd[0]);
818                 close(lock_ctx->fd[1]);
819                 talloc_free(tmp_ctx);
820                 return;
821         }
822
823
824         /* Child process */
825         if (lock_ctx->child == 0) {
826                 ret = execv(prog, args);
827                 if (ret < 0) {
828                         DEBUG(DEBUG_ERR, ("Failed to execute helper %s (%d, %s)\n",
829                                           prog, errno, strerror(errno)));
830                 }
831                 _exit(1);
832         }
833
834         /* Parent process */
835         close(lock_ctx->fd[1]);
836
837         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
838
839         talloc_free(tmp_ctx);
840
841         /* Set up timeout handler */
842         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
843                                             lock_ctx,
844                                             timeval_current_ofs(10, 0),
845                                             ctdb_lock_timeout_handler,
846                                             (void *)lock_ctx);
847         if (lock_ctx->ttimer == NULL) {
848                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
849                 lock_ctx->child = -1;
850                 talloc_set_destructor(lock_ctx, NULL);
851                 close(lock_ctx->fd[0]);
852                 return;
853         }
854
855         /* Set up callback */
856         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
857                                       lock_ctx,
858                                       lock_ctx->fd[0],
859                                       EVENT_FD_READ,
860                                       ctdb_lock_handler,
861                                       (void *)lock_ctx);
862         if (lock_ctx->tfd == NULL) {
863                 TALLOC_FREE(lock_ctx->ttimer);
864                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
865                 lock_ctx->child = -1;
866                 talloc_set_destructor(lock_ctx, NULL);
867                 close(lock_ctx->fd[0]);
868                 return;
869         }
870         tevent_fd_set_auto_close(lock_ctx->tfd);
871
872         /* Move the context from pending to current */
873         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
874         ctdb->lock_num_pending--;
875         DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
876         if (lock_ctx->ctdb_db) {
877                 lock_ctx->ctdb_db->lock_num_current++;
878                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
879                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
880         }
881 }
882
883
884 /*
885  * Lock record / db depending on type
886  */
887 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
888                                                struct ctdb_db_context *ctdb_db,
889                                                TDB_DATA key,
890                                                uint32_t priority,
891                                                void (*callback)(void *, bool),
892                                                void *private_data,
893                                                enum lock_type type,
894                                                bool auto_mark)
895 {
896         struct lock_context *lock_ctx = NULL;
897         struct lock_request *request;
898
899         if (callback == NULL) {
900                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
901                 return NULL;
902         }
903
904 #if 0
905         /* Disable this optimization to ensure first-in-first-out fair
906          * scheduling of lock requests */
907
908         /* get a context for this key - search only the pending contexts,
909          * current contexts might in the middle of processing callbacks */
910         lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
911 #endif
912
913         /* No existing context, create one */
914         if (lock_ctx == NULL) {
915                 lock_ctx = talloc_zero(ctdb, struct lock_context);
916                 if (lock_ctx == NULL) {
917                         DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
918                         return NULL;
919                 }
920
921                 lock_ctx->type = type;
922                 lock_ctx->ctdb = ctdb;
923                 lock_ctx->ctdb_db = ctdb_db;
924                 lock_ctx->key.dsize = key.dsize;
925                 if (key.dsize > 0) {
926                         lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
927                 } else {
928                         lock_ctx->key.dptr = NULL;
929                 }
930                 lock_ctx->priority = priority;
931                 lock_ctx->auto_mark = auto_mark;
932
933                 lock_ctx->child = -1;
934                 lock_ctx->block_child = -1;
935
936                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
937                 ctdb->lock_num_pending++;
938                 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
939                 if (ctdb_db) {
940                         CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
941                 }
942
943                 /* Start the timer when we activate the context */
944                 lock_ctx->start_time = timeval_current();
945         }
946
947         if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
948                 return NULL;
949         }
950
951         request->lctx = lock_ctx;
952         request->callback = callback;
953         request->private_data = private_data;
954
955         talloc_set_destructor(request, ctdb_lock_request_destructor);
956         DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
957
958         ctdb_lock_schedule(ctdb);
959
960         return request;
961 }
962
963
964 /*
965  * obtain a lock on a record in a database
966  */
967 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
968                                       TDB_DATA key,
969                                       bool auto_mark,
970                                       void (*callback)(void *, bool),
971                                       void *private_data)
972 {
973         return ctdb_lock_internal(ctdb_db->ctdb,
974                                   ctdb_db,
975                                   key,
976                                   0,
977                                   callback,
978                                   private_data,
979                                   LOCK_RECORD,
980                                   auto_mark);
981 }
982
983
984 /*
985  * obtain a lock on a database
986  */
987 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
988                                   bool auto_mark,
989                                   void (*callback)(void *, bool),
990                                   void *private_data)
991 {
992         return ctdb_lock_internal(ctdb_db->ctdb,
993                                   ctdb_db,
994                                   tdb_null,
995                                   0,
996                                   callback,
997                                   private_data,
998                                   LOCK_DB,
999                                   auto_mark);
1000 }
1001
1002
1003 /*
1004  * obtain locks on all databases of specified priority
1005  */
1006 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
1007                                           uint32_t priority,
1008                                           bool auto_mark,
1009                                           void (*callback)(void *, bool),
1010                                           void *private_data)
1011 {
1012         if (priority < 1 || priority > NUM_DB_PRIORITIES) {
1013                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
1014                 return NULL;
1015         }
1016
1017         return ctdb_lock_internal(ctdb,
1018                                   NULL,
1019                                   tdb_null,
1020                                   priority,
1021                                   callback,
1022                                   private_data,
1023                                   LOCK_ALLDB_PRIO,
1024                                   auto_mark);
1025 }
1026
1027
1028 /*
1029  * obtain locks on all databases
1030  */
1031 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
1032                                      bool auto_mark,
1033                                      void (*callback)(void *, bool),
1034                                      void *private_data)
1035 {
1036         return ctdb_lock_internal(ctdb,
1037                                   NULL,
1038                                   tdb_null,
1039                                   0,
1040                                   callback,
1041                                   private_data,
1042                                   LOCK_ALLDB,
1043                                   auto_mark);
1044 }
1045