if lockwait takes an excessive time to complete. log the time it took to
[metze/ctdb/wip.git] / server / ctdb_lockwait.c
1 /* 
2    wait for a tdb chain lock
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27
28
29 struct lockwait_handle {
30         struct ctdb_context *ctdb;
31         struct ctdb_db_context *ctdb_db;
32         struct fd_event *fde;
33         int fd[2];
34         pid_t child;
35         void *private_data;
36         void (*callback)(void *);
37         TDB_DATA key;
38         struct timeval start_time;
39 };
40
41
42 /* This handler is called when a lockwait child process has completed the lock
43  */
44 static void lockwait_handler(struct event_context *ev, struct fd_event *fde, 
45                              uint16_t flags, void *private_data)
46 {
47         struct lockwait_handle *h = talloc_get_type(private_data, 
48                                                      struct lockwait_handle);
49         void (*callback)(void *) = h->callback;
50         void *p = h->private_data;
51         pid_t child = h->child;
52         TDB_DATA key = h->key;
53         struct tdb_context *tdb = h->ctdb_db->ltdb->tdb;
54         TALLOC_CTX *tmp_ctx = talloc_new(ev);
55         double l;
56
57         key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize);
58
59         talloc_set_destructor(h, NULL);
60
61         /* If a lockwait takes excessive time to complete we want 
62            this logged 
63         */
64         l = timeval_elapsed(&h->start_time);
65         if (l > 1.0) {
66                 DEBUG(0,("Lockwait took %.6f seconds to complete for database %s\n", l, h->ctdb_db->db_name));
67         }
68
69         ctdb_latency(&h->ctdb->statistics.max_lockwait_latency, h->start_time);
70         h->ctdb->statistics.pending_lockwait_calls--;
71
72         /* the handle needs to go away when the context is gone - when
73            the handle goes away this implicitly closes the pipe, which
74            kills the child holding the lock */
75         talloc_steal(tmp_ctx, h);
76
77         if (h->ctdb->flags & CTDB_FLAG_TORTURE) {
78                 if (tdb_chainlock_nonblock(tdb, key) == 0) {
79                         ctdb_fatal(h->ctdb, "got chain lock while lockwait child active");
80                 }
81         }
82
83         tdb_chainlock_mark(tdb, key);
84         callback(p);
85         tdb_chainlock_unmark(tdb, key);
86
87         kill(child, SIGKILL);
88         waitpid(child, NULL, 0);
89         talloc_free(tmp_ctx);
90 }
91
92 static int lockwait_destructor(struct lockwait_handle *h)
93 {
94         h->ctdb->statistics.pending_lockwait_calls--;
95         kill(h->child, SIGKILL);
96         waitpid(h->child, NULL, 0);
97         return 0;
98 }
99
100 /*
101   setup a non-blocking chainlock on a tdb record. If this function
102   returns NULL then it could not get the chainlock. Otherwise it
103   returns a opaque handle, and will call callback() once it has
104   managed to get the chainlock. You can cancel it by using talloc_free
105   on the returned handle.
106
107   It is the callers responsibility to unlock the chainlock once
108   acquired
109  */
110 struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
111                                       TDB_DATA key,
112                                       void (*callback)(void *private_data),
113                                       void *private_data)
114 {
115         struct lockwait_handle *result;
116         int ret;
117         pid_t parent = getpid();
118
119         ctdb_db->ctdb->statistics.lockwait_calls++;
120         ctdb_db->ctdb->statistics.pending_lockwait_calls++;
121
122         if (!(result = talloc_zero(private_data, struct lockwait_handle))) {
123                 ctdb_db->ctdb->statistics.pending_lockwait_calls--;
124                 return NULL;
125         }
126
127         ret = pipe(result->fd);
128
129         if (ret != 0) {
130                 talloc_free(result);
131                 ctdb_db->ctdb->statistics.pending_lockwait_calls--;
132                 return NULL;
133         }
134
135         result->child = fork();
136
137         if (result->child == (pid_t)-1) {
138                 close(result->fd[0]);
139                 close(result->fd[1]);
140                 talloc_free(result);
141                 ctdb_db->ctdb->statistics.pending_lockwait_calls--;
142                 return NULL;
143         }
144
145         result->callback = callback;
146         result->private_data = private_data;
147         result->ctdb = ctdb_db->ctdb;
148         result->ctdb_db = ctdb_db;
149         result->key = key;
150
151         if (result->child == 0) {
152                 char c = 0;
153                 close(result->fd[0]);
154                 tdb_chainlock(ctdb_db->ltdb->tdb, key);
155                 write(result->fd[1], &c, 1);
156                 /* make sure we die when our parent dies */
157                 while (kill(parent, 0) == 0 || errno != ESRCH) {
158                         sleep(5);
159                 }
160                 _exit(0);
161         }
162
163         close(result->fd[1]);
164         talloc_set_destructor(result, lockwait_destructor);
165
166         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
167                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, lockwait_handler,
168                                    (void *)result);
169         if (result->fde == NULL) {
170                 talloc_free(result);
171                 ctdb_db->ctdb->statistics.pending_lockwait_calls--;
172                 return NULL;
173         }
174
175         result->start_time = timeval_current();
176
177         return result;
178 }