locking: Do not create multiple lock processes for the same key
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    diagnose using /proc/locks and log warning message
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  */
45
46 /* FIXME: Add a tunable max_lock_processes_per_db */
47 #define MAX_LOCK_PROCESSES_PER_DB               (100)
48
49 enum lock_type {
50         LOCK_RECORD,
51         LOCK_DB,
52         LOCK_ALLDB_PRIO,
53         LOCK_ALLDB,
54 };
55
56 static const char * const lock_type_str[] = {
57         "lock_record",
58         "lock_db",
59         "lock_alldb_prio",
60         "lock_db",
61 };
62
63 struct lock_request;
64
65 /* lock_context is the common part for a lock request */
66 struct lock_context {
67         struct lock_context *next, *prev;
68         enum lock_type type;
69         struct ctdb_context *ctdb;
70         struct ctdb_db_context *ctdb_db;
71         TDB_DATA key;
72         uint32_t priority;
73         bool auto_mark;
74         struct lock_request *req_queue;
75         pid_t child;
76         int fd[2];
77         struct tevent_fd *tfd;
78         struct tevent_timer *ttimer;
79         pid_t block_child;
80         int block_fd[2];
81         struct timeval start_time;
82 };
83
84 /* lock_request is the client specific part for a lock request */
85 struct lock_request {
86         struct lock_request *next, *prev;
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
117                             uint32_t priority,
118                             void *private_data);
119
120 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
121                             db_handler_t handler, void *private_data)
122 {
123         struct ctdb_db_context *ctdb_db;
124         int ret;
125
126         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
127                 if (ctdb_db->priority != priority) {
128                         continue;
129                 }
130                 if (later_db(ctdb, ctdb_db->db_name)) {
131                         continue;
132                 }
133                 ret = handler(ctdb_db, priority, private_data);
134                 if (ret != 0) {
135                         return -1;
136                 }
137         }
138
139         /* If priority != 1, later_db check is not required and can return */
140         if (priority != 1) {
141                 return 0;
142         }
143
144         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
145                 if (!later_db(ctdb, ctdb_db->db_name)) {
146                         continue;
147                 }
148                 ret = handler(ctdb_db, priority, private_data);
149                 if (ret != 0) {
150                         return -1;
151                 }
152         }
153
154         return 0;
155 }
156
157
158 /*
159  * lock all databases - mark only
160  */
161 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
162                                 void *private_data)
163 {
164         int tdb_transaction_write_lock_mark(struct tdb_context *);
165
166         DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
167                            ctdb_db->db_name, priority));
168
169         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
170                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
171                                   ctdb_db->db_name));
172                 return -1;
173         }
174
175         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
176                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
177                                   ctdb_db->db_name));
178                 return -1;
179         }
180
181         return 0;
182 }
183
184 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
185 {
186         /*
187          * This function is only used by the main dameon during recovery.
188          * At this stage, the databases have already been locked, by a
189          * dedicated child process. The freeze_mode variable is used to track
190          * whether the actual locks are held by the child process or not.
191          */
192
193         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
194                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
195                 return -1;
196         }
197
198         return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
199 }
200
201 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
202 {
203         uint32_t priority;
204
205         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
206                 if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
207                         return -1;
208                 }
209         }
210
211         return 0;
212 }
213
214
215 /*
216  * lock all databases - unmark only
217  */
218 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
219                                   void *private_data)
220 {
221         int tdb_transaction_write_lock_unmark(struct tdb_context *);
222
223         DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
224                            ctdb_db->db_name, priority));
225
226         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
227                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
228                                   ctdb_db->db_name));
229                 return -1;
230         }
231
232         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
233                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
234                                   ctdb_db->db_name));
235                 return -1;
236         }
237
238         return 0;
239 }
240
241 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
242 {
243         /*
244          * This function is only used by the main dameon during recovery.
245          * At this stage, the databases have already been locked, by a
246          * dedicated child process. The freeze_mode variable is used to track
247          * whether the actual locks are held by the child process or not.
248          */
249
250         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
251                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
252                 return -1;
253         }
254
255         return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
256 }
257
258 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
259 {
260         uint32_t priority;
261
262         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
263                 if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
264                         return -1;
265                 }
266         }
267
268         return 0;
269 }
270
271
272 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
273
274 /*
275  * Destructor to kill the child locking process
276  */
277 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
278 {
279         if (lock_ctx->child > 0) {
280                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
281                 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
282                 lock_ctx->ctdb->lock_num_current--;
283                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
284                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
285                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
286                 }
287         } else {
288                 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
289                 lock_ctx->ctdb->lock_num_pending--;
290                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
291                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
292                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
293                 }
294         }
295
296         ctdb_lock_schedule(lock_ctx->ctdb);
297
298         return 0;
299 }
300
301
302 /*
303  * Destructor to remove lock request
304  */
305 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
306 {
307         DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
308         return 0;
309 }
310
311
312 void ctdb_lock_free_request_context(struct lock_request *lock_req)
313 {
314         struct lock_context *lock_ctx;
315
316         lock_ctx = lock_req->lctx;
317         talloc_free(lock_req);
318         talloc_free(lock_ctx);
319 }
320
321
322 /*
323  * Process all the callbacks waiting for lock
324  *
325  * If lock has failed, callback is executed with locked=false
326  */
327 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
328 {
329         struct lock_request *request, *next;
330
331         if (lock_ctx->auto_mark && locked) {
332                 switch (lock_ctx->type) {
333                 case LOCK_RECORD:
334                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
335                         break;
336
337                 case LOCK_DB:
338                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
339                         break;
340
341                 case LOCK_ALLDB_PRIO:
342                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
343                         break;
344
345                 case LOCK_ALLDB:
346                         ctdb_lockall_mark(lock_ctx->ctdb);
347                         break;
348                 }
349         }
350
351         /* Iterate through all callbacks */
352         request = lock_ctx->req_queue;
353         while (request) {
354                 if (lock_ctx->auto_mark) {
355                         /* Reset the destructor, so request is not removed from the list */
356                         talloc_set_destructor(request, NULL);
357                 }
358
359                 /* In case, callback frees the request, store next */
360                 next = request->next;
361                 request->callback(request->private_data, locked);
362                 request = next;
363         }
364
365         if (lock_ctx->auto_mark && locked) {
366                 switch (lock_ctx->type) {
367                 case LOCK_RECORD:
368                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
369                         break;
370
371                 case LOCK_DB:
372                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
373                         break;
374
375                 case LOCK_ALLDB_PRIO:
376                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
377                         break;
378
379                 case LOCK_ALLDB:
380                         ctdb_lockall_unmark(lock_ctx->ctdb);
381                         break;
382                 }
383         }
384 }
385
386
387 static int lock_bucket_id(double t)
388 {
389         double ms = 1.e-3, s = 1;
390         int id;
391
392         if (t < 1*ms) {
393                 id = 0;
394         } else if (t < 10*ms) {
395                 id = 1;
396         } else if (t < 100*ms) {
397                 id = 2;
398         } else if (t < 1*s) {
399                 id = 3;
400         } else if (t < 2*s) {
401                 id = 4;
402         } else if (t < 4*s) {
403                 id = 5;
404         } else if (t < 8*s) {
405                 id = 6;
406         } else if (t < 16*s) {
407                 id = 7;
408         } else if (t < 32*s) {
409                 id = 8;
410         } else if (t < 64*s) {
411                 id = 9;
412         } else {
413                 id = 10;
414         }
415
416         return id;
417 }
418
419 /*
420  * Callback routine when the required locks are obtained.
421  * Called from parent context
422  */
423 static void ctdb_lock_handler(struct tevent_context *ev,
424                             struct tevent_fd *tfd,
425                             uint16_t flags,
426                             void *private_data)
427 {
428         struct lock_context *lock_ctx;
429         TALLOC_CTX *tmp_ctx = NULL;
430         char c;
431         bool locked;
432         double t;
433         int id;
434
435         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
436
437         /* cancel the timeout event */
438         if (lock_ctx->ttimer) {
439                 TALLOC_FREE(lock_ctx->ttimer);
440         }
441
442         t = timeval_elapsed(&lock_ctx->start_time);
443         id = lock_bucket_id(t);
444
445         if (lock_ctx->auto_mark) {
446                 tmp_ctx = talloc_new(ev);
447                 talloc_steal(tmp_ctx, lock_ctx);
448         }
449
450         /* Read the status from the child process */
451         read(lock_ctx->fd[0], &c, 1);
452         locked = (c == 0 ? true : false);
453
454         /* Update statistics */
455         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
456         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
457         if (lock_ctx->ctdb_db) {
458                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
459                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
460         }
461
462         if (locked) {
463                 if (lock_ctx->ctdb_db) {
464                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
465                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
466                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
467                                             lock_type_str[lock_ctx->type], locks.latency,
468                                             lock_ctx->start_time);
469
470                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
471                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
472                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
473                 }
474         } else {
475                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
476                 if (lock_ctx->ctdb_db) {
477                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
478                 }
479         }
480
481         process_callbacks(lock_ctx, locked);
482
483         if (lock_ctx->auto_mark) {
484                 talloc_free(tmp_ctx);
485         }
486 }
487
488
489 /*
490  * Callback routine when required locks are not obtained within timeout
491  * Called from parent context
492  */
493 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
494                                     struct tevent_timer *ttimer,
495                                     struct timeval current_time,
496                                     void *private_data)
497 {
498         const char *cmd = getenv("CTDB_DEBUG_LOCKS");
499         struct lock_context *lock_ctx;
500         struct ctdb_context *ctdb;
501         pid_t pid;
502
503         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
504         ctdb = lock_ctx->ctdb;
505
506         if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
507                 DEBUG(DEBUG_WARNING,
508                       ("Unable to get %s lock on database %s for %.0lf seconds\n",
509                        (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
510                        lock_ctx->ctdb_db->db_name,
511                        timeval_elapsed(&lock_ctx->start_time)));
512         } else {
513                 DEBUG(DEBUG_WARNING,
514                       ("Unable to get ALLDB locks for %.0lf seconds\n",
515                        timeval_elapsed(&lock_ctx->start_time)));
516         }
517
518         /* fire a child process to find the blocking process */
519         if (cmd != NULL) {
520                 pid = fork();
521                 if (pid == 0) {
522                         execl(cmd, cmd, NULL);
523                 }
524         }
525
526         /* reset the timeout timer */
527         // talloc_free(lock_ctx->ttimer);
528         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
529                                             lock_ctx,
530                                             timeval_current_ofs(10, 0),
531                                             ctdb_lock_timeout_handler,
532                                             (void *)lock_ctx);
533 }
534
535
536 static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
537                             void *private_data)
538 {
539         int *count = (int *)private_data;
540
541         (*count)++;
542
543         return 0;
544 }
545
546 struct db_namelist {
547         char **names;
548         int n;
549 };
550
551 static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
552                            void *private_data)
553 {
554         struct db_namelist *list = (struct db_namelist *)private_data;
555
556         list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
557         list->n++;
558
559         return 0;
560 }
561
562 static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ctx, int fd)
563 {
564         struct ctdb_context *ctdb = lock_ctx->ctdb;
565         char **args = NULL;
566         int nargs, i;
567         int priority;
568         struct db_namelist list;
569
570         switch (lock_ctx->type) {
571         case LOCK_RECORD:
572                 nargs = 6;
573                 break;
574
575         case LOCK_DB:
576                 nargs = 5;
577                 break;
578
579         case LOCK_ALLDB_PRIO:
580                 nargs = 4;
581                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
582                 break;
583
584         case LOCK_ALLDB:
585                 nargs = 4;
586                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
587                         ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
588                 }
589                 break;
590         }
591
592         /* Add extra argument for null termination */
593         nargs++;
594
595         args = talloc_array(mem_ctx, char *, nargs);
596         if (args == NULL) {
597                 return NULL;
598         }
599
600         args[0] = talloc_strdup(args, "ctdb_lock_helper");
601         args[1] = talloc_asprintf(args, "%d", getpid());
602         args[2] = talloc_asprintf(args, "%d", fd);
603
604         switch (lock_ctx->type) {
605         case LOCK_RECORD:
606                 args[3] = talloc_strdup(args, "RECORD");
607                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
608                 if (lock_ctx->key.dsize == 0) {
609                         args[5] = talloc_strdup(args, "NULL");
610                 } else {
611                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
612                 }
613                 break;
614
615         case LOCK_DB:
616                 args[3] = talloc_strdup(args, "DB");
617                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
618                 break;
619
620         case LOCK_ALLDB_PRIO:
621                 args[3] = talloc_strdup(args, "DB");
622                 list.names = args;
623                 list.n = 4;
624                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
625                 break;
626
627         case LOCK_ALLDB:
628                 args[3] = talloc_strdup(args, "DB");
629                 list.names = args;
630                 list.n = 4;
631                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
632                         ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
633                 }
634                 break;
635         }
636
637         /* Make sure last argument is NULL */
638         args[nargs-1] = NULL;
639
640         for (i=0; i<nargs-1; i++) {
641                 if (args[i] == NULL) {
642                         talloc_free(args);
643                         return NULL;
644                 }
645         }
646
647         return args;
648 }
649
650
651 /*
652  * Find the lock context of a given type
653  */
654 static struct lock_context *find_lock_context(struct lock_context *lock_list,
655                                               struct ctdb_db_context *ctdb_db,
656                                               TDB_DATA key,
657                                               uint32_t priority,
658                                               enum lock_type type)
659 {
660         struct lock_context *lock_ctx;
661
662         /* Search active locks */
663         for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
664                 if (lock_ctx->type != type) {
665                         continue;
666                 }
667
668                 switch (lock_ctx->type) {
669                 case LOCK_RECORD:
670                         if (ctdb_db == lock_ctx->ctdb_db &&
671                             key.dsize == lock_ctx->key.dsize &&
672                             memcmp(key.dptr, lock_ctx->key.dptr, key.dsize) == 0) {
673                                 goto done;
674                         }
675                         break;
676
677                 case LOCK_DB:
678                         if (ctdb_db == lock_ctx->ctdb_db) {
679                                 goto done;
680                         }
681                         break;
682
683                 case LOCK_ALLDB_PRIO:
684                         if (priority == lock_ctx->priority) {
685                                 goto done;
686                         }
687                         break;
688
689                 case LOCK_ALLDB:
690                         goto done;
691                         break;
692                 }
693         }
694
695         /* Did not find the lock context we are searching for */
696         lock_ctx = NULL;
697
698 done:
699         return lock_ctx;
700
701 }
702
703
704 /*
705  * Schedule a new lock child process
706  * Set up callback handler and timeout handler
707  */
708 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
709 {
710         struct lock_context *lock_ctx, *next_ctx, *active_ctx;
711         int ret;
712         TALLOC_CTX *tmp_ctx;
713         const char *helper = BINDIR "/ctdb_lock_helper";
714         static const char *prog = NULL;
715         char **args;
716
717         if (prog == NULL) {
718                 const char *t;
719
720                 t = getenv("CTDB_LOCK_HELPER");
721                 if (t != NULL) {
722                         prog = talloc_strdup(ctdb, t);
723                 } else {
724                         prog = talloc_strdup(ctdb, helper);
725                 }
726                 CTDB_NO_MEMORY_VOID(ctdb, prog);
727         }
728
729         if (ctdb->lock_num_current >= MAX_LOCK_PROCESSES_PER_DB) {
730                 return;
731         }
732
733         if (ctdb->lock_pending == NULL) {
734                 return;
735         }
736
737         /* Find a lock context with requests */
738         lock_ctx = ctdb->lock_pending;
739         while (lock_ctx != NULL) {
740                 next_ctx = lock_ctx->next;
741                 if (! lock_ctx->req_queue) {
742                         DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
743                         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
744                         ctdb->lock_num_pending--;
745                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
746                         if (lock_ctx->ctdb_db) {
747                                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
748                         }
749                         talloc_free(lock_ctx);
750                 } else {
751                         active_ctx = find_lock_context(ctdb->lock_current, lock_ctx->ctdb_db,
752                                                        lock_ctx->key, lock_ctx->priority,
753                                                        lock_ctx->type);
754                         if (active_ctx == NULL) {
755                                 /* Found a lock context with lock requests */
756                                 break;
757                         }
758
759                         /* There is already a child waiting for the
760                          * same key.  So don't schedule another child
761                          * just yet.
762                          */
763                 }
764                 lock_ctx = next_ctx;
765         }
766
767         if (lock_ctx == NULL) {
768                 return;
769         }
770
771         lock_ctx->child = -1;
772         ret = pipe(lock_ctx->fd);
773         if (ret != 0) {
774                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
775                 return;
776         }
777
778         set_close_on_exec(lock_ctx->fd[0]);
779
780         /* Create data for child process */
781         tmp_ctx = talloc_new(lock_ctx);
782         if (tmp_ctx == NULL) {
783                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
784                 close(lock_ctx->fd[0]);
785                 close(lock_ctx->fd[1]);
786                 return;
787         }
788
789         /* Create arguments for lock helper */
790         args = lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1]);
791         if (args == NULL) {
792                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
793                 close(lock_ctx->fd[0]);
794                 close(lock_ctx->fd[1]);
795                 talloc_free(tmp_ctx);
796                 return;
797         }
798
799         lock_ctx->child = ctdb_fork(ctdb);
800
801         if (lock_ctx->child == (pid_t)-1) {
802                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
803                 close(lock_ctx->fd[0]);
804                 close(lock_ctx->fd[1]);
805                 talloc_free(tmp_ctx);
806                 return;
807         }
808
809
810         /* Child process */
811         if (lock_ctx->child == 0) {
812                 ret = execv(prog, args);
813                 if (ret < 0) {
814                         DEBUG(DEBUG_ERR, ("Failed to execute helper %s (%d, %s)\n",
815                                           prog, errno, strerror(errno)));
816                 }
817                 _exit(1);
818         }
819
820         /* Parent process */
821         close(lock_ctx->fd[1]);
822
823         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
824
825         talloc_free(tmp_ctx);
826
827         /* Set up timeout handler */
828         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
829                                             lock_ctx,
830                                             timeval_current_ofs(10, 0),
831                                             ctdb_lock_timeout_handler,
832                                             (void *)lock_ctx);
833         if (lock_ctx->ttimer == NULL) {
834                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
835                 lock_ctx->child = -1;
836                 talloc_set_destructor(lock_ctx, NULL);
837                 close(lock_ctx->fd[0]);
838                 return;
839         }
840
841         /* Set up callback */
842         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
843                                       lock_ctx,
844                                       lock_ctx->fd[0],
845                                       EVENT_FD_READ,
846                                       ctdb_lock_handler,
847                                       (void *)lock_ctx);
848         if (lock_ctx->tfd == NULL) {
849                 TALLOC_FREE(lock_ctx->ttimer);
850                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
851                 lock_ctx->child = -1;
852                 talloc_set_destructor(lock_ctx, NULL);
853                 close(lock_ctx->fd[0]);
854                 return;
855         }
856         tevent_fd_set_auto_close(lock_ctx->tfd);
857
858         /* Move the context from pending to current */
859         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
860         ctdb->lock_num_pending--;
861         DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
862         ctdb->lock_num_current++;
863 }
864
865
866 /*
867  * Lock record / db depending on type
868  */
869 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
870                                                struct ctdb_db_context *ctdb_db,
871                                                TDB_DATA key,
872                                                uint32_t priority,
873                                                void (*callback)(void *, bool),
874                                                void *private_data,
875                                                enum lock_type type,
876                                                bool auto_mark)
877 {
878         struct lock_context *lock_ctx;
879         struct lock_request *request;
880
881         if (callback == NULL) {
882                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
883                 return NULL;
884         }
885
886         /* get a context for this key - search only the pending contexts,
887          * current contexts might in the middle of processing callbacks */
888         lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
889
890         /* No existing context, create one */
891         if (lock_ctx == NULL) {
892                 lock_ctx = talloc_zero(ctdb, struct lock_context);
893                 if (lock_ctx == NULL) {
894                         DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
895                         return NULL;
896                 }
897
898                 lock_ctx->type = type;
899                 lock_ctx->ctdb = ctdb;
900                 lock_ctx->ctdb_db = ctdb_db;
901                 lock_ctx->key.dsize = key.dsize;
902                 if (key.dsize > 0) {
903                         lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
904                 } else {
905                         lock_ctx->key.dptr = NULL;
906                 }
907                 lock_ctx->priority = priority;
908                 lock_ctx->auto_mark = auto_mark;
909
910                 lock_ctx->child = -1;
911                 lock_ctx->block_child = -1;
912
913                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
914                 ctdb->lock_num_pending++;
915                 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
916                 if (ctdb_db) {
917                         CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
918                 }
919
920                 /* Start the timer when we activate the context */
921                 lock_ctx->start_time = timeval_current();
922         }
923
924         if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
925                 return NULL;
926         }
927
928         request->lctx = lock_ctx;
929         request->callback = callback;
930         request->private_data = private_data;
931
932         talloc_set_destructor(request, ctdb_lock_request_destructor);
933         DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
934
935         ctdb_lock_schedule(ctdb);
936
937         return request;
938 }
939
940
941 /*
942  * obtain a lock on a record in a database
943  */
944 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
945                                       TDB_DATA key,
946                                       bool auto_mark,
947                                       void (*callback)(void *, bool),
948                                       void *private_data)
949 {
950         return ctdb_lock_internal(ctdb_db->ctdb,
951                                   ctdb_db,
952                                   key,
953                                   0,
954                                   callback,
955                                   private_data,
956                                   LOCK_RECORD,
957                                   auto_mark);
958 }
959
960
961 /*
962  * obtain a lock on a database
963  */
964 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
965                                   bool auto_mark,
966                                   void (*callback)(void *, bool),
967                                   void *private_data)
968 {
969         return ctdb_lock_internal(ctdb_db->ctdb,
970                                   ctdb_db,
971                                   tdb_null,
972                                   0,
973                                   callback,
974                                   private_data,
975                                   LOCK_DB,
976                                   auto_mark);
977 }
978
979
980 /*
981  * obtain locks on all databases of specified priority
982  */
983 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
984                                           uint32_t priority,
985                                           bool auto_mark,
986                                           void (*callback)(void *, bool),
987                                           void *private_data)
988 {
989         if (priority < 0 || priority > NUM_DB_PRIORITIES) {
990                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
991                 return NULL;
992         }
993
994         return ctdb_lock_internal(ctdb,
995                                   NULL,
996                                   tdb_null,
997                                   priority,
998                                   callback,
999                                   private_data,
1000                                   LOCK_ALLDB_PRIO,
1001                                   auto_mark);
1002 }
1003
1004
1005 /*
1006  * obtain locks on all databases
1007  */
1008 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
1009                                      bool auto_mark,
1010                                      void (*callback)(void *, bool),
1011                                      void *private_data)
1012 {
1013         return ctdb_lock_internal(ctdb,
1014                                   NULL,
1015                                   tdb_null,
1016                                   0,
1017                                   callback,
1018                                   private_data,
1019                                   LOCK_ALLDB,
1020                                   auto_mark);
1021 }
1022