35f5902b00ae4c2e51d5e71a2a66122dffbbcde9
[samba.git] / ctdb / server / ctdb_lockwait.c
1 /* 
2    wait for a tdb chain lock
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/wait.h"
23 #include "db_wrap.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "../include/ctdb_private.h"
27
28
29 struct lockwait_handle {
30         struct lockwait_handle *next, *prev;
31         struct ctdb_context *ctdb;
32         struct ctdb_db_context *ctdb_db;
33         struct fd_event *fde;
34         int fd[2];
35         pid_t child;
36         void *private_data;
37         void (*callback)(void *);
38         TDB_DATA key;
39         struct timeval start_time;
40 };
41
42 /* If we managed to obtain a lock, find any overflow records which wanted the
43  * same one and do all the callbacks at once. */
44 static void do_overflow(struct ctdb_db_context *ctdb_db,
45                         TDB_DATA key)
46 {
47         struct lockwait_handle *i, *next;
48         TALLOC_CTX *tmp_ctx = talloc_new(ctdb_db);
49
50         for (i = ctdb_db->lockwait_overflow; i; i = next) {
51                 /* Careful: destructor removes it from list! */
52                 next = i->next;
53                 if (key.dsize == i->key.dsize
54                     && memcmp(key.dptr, i->key.dptr, key.dsize) == 0) {
55                         /* Callback might free them, so reparent. */
56                         talloc_steal(tmp_ctx, i);
57                         i->callback(i->private_data);
58                 }
59         }
60
61         /* This will free them if callback didn't. */
62         talloc_free(tmp_ctx);
63
64         /* Remove one from the overflow queue if there is one. */
65         if (ctdb_db->lockwait_overflow) {
66                 i = ctdb_db->lockwait_overflow;
67                 ctdb_lockwait(ctdb_db, i->key, i->callback, i->private_data);
68                 talloc_free(i);
69         }
70 }
71
72 static int lockwait_destructor(struct lockwait_handle *h)
73 {
74         CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
75         kill(h->child, SIGKILL);
76         h->ctdb_db->pending_requests--;
77         DLIST_REMOVE(h->ctdb_db->lockwait_active, h);
78         return 0;
79 }
80
81 static void lockwait_handler(struct event_context *ev, struct fd_event *fde, 
82                              uint16_t flags, void *private_data)
83 {
84         struct lockwait_handle *h = talloc_get_type(private_data, 
85                                                      struct lockwait_handle);
86         void (*callback)(void *) = h->callback;
87         void *p = h->private_data;
88         TDB_DATA key = h->key;
89         struct tdb_context *tdb = h->ctdb_db->ltdb->tdb;
90         TALLOC_CTX *tmp_ctx = talloc_new(ev);
91
92         key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize);
93         h->ctdb_db->pending_requests--;
94
95         CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "lockwait", lockwait_latency, h->start_time);
96
97         /* the handle needs to go away when the context is gone - when
98            the handle goes away this implicitly closes the pipe, which
99            kills the child holding the lock */
100         talloc_steal(tmp_ctx, h);
101
102         if (h->ctdb->flags & CTDB_FLAG_TORTURE) {
103                 if (tdb_chainlock_nonblock(tdb, key) == 0) {
104                         ctdb_fatal(h->ctdb, "got chain lock while lockwait child active");
105                 }
106         }
107
108         tdb_chainlock_mark(tdb, key);
109         callback(p);
110         if (h->ctdb_db->lockwait_overflow) {
111                 do_overflow(h->ctdb_db, key);
112         }
113         tdb_chainlock_unmark(tdb, key);
114
115         talloc_free(tmp_ctx);
116 }
117
118
119 static int overflow_lockwait_destructor(struct lockwait_handle *h)
120 {
121         CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
122         DLIST_REMOVE(h->ctdb_db->lockwait_overflow, h);
123         return 0;
124 }
125
126 /*
127   setup a non-blocking chainlock on a tdb record. If this function
128   returns NULL then it could not get the chainlock. Otherwise it
129   returns a opaque handle, and will call callback() once it has
130   managed to get the chainlock. You can cancel it by using talloc_free
131   on the returned handle.
132
133   It is the callers responsibility to unlock the chainlock once
134   acquired
135  */
136 struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
137                                       TDB_DATA key,
138                                       void (*callback)(void *private_data),
139                                       void *private_data)
140 {
141         struct lockwait_handle *result, *i;
142         int ret;
143         pid_t parent = getpid();
144
145         CTDB_INCREMENT_STAT(ctdb_db->ctdb, lockwait_calls);
146         CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
147
148         if (!(result = talloc_zero(private_data, struct lockwait_handle))) {
149                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
150                 return NULL;
151         }
152
153         result->callback = callback;
154         result->private_data = private_data;
155         result->ctdb = ctdb_db->ctdb;
156         result->ctdb_db = ctdb_db;
157         result->key = key;
158
159         /* If we already have a lockwait child for this request, then put this
160            request on the overflow queue straight away
161          */
162         for (i = ctdb_db->lockwait_active; i; i = i->next) {
163                 if (key.dsize == i->key.dsize
164                     && memcmp(key.dptr, i->key.dptr, key.dsize) == 0) {
165                         DLIST_ADD_END(ctdb_db->lockwait_overflow, result, NULL);
166                         talloc_set_destructor(result, overflow_lockwait_destructor);
167                         return result;
168                 }
169         }
170
171         /* Don't fire off too many children at once! */
172         if (ctdb_db->pending_requests > 200) {
173                 DLIST_ADD_END(ctdb_db->lockwait_overflow, result, NULL);
174                 talloc_set_destructor(result, overflow_lockwait_destructor);
175                 DEBUG(DEBUG_DEBUG, (__location__ " Created overflow for %s\n",
176                                     ctdb_db->db_name));
177                 return result;
178         }
179
180         ret = pipe(result->fd);
181
182         if (ret != 0) {
183                 talloc_free(result);
184                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
185                 return NULL;
186         }
187
188         result->child = ctdb_fork(ctdb_db->ctdb);
189
190         if (result->child == (pid_t)-1) {
191                 close(result->fd[0]);
192                 close(result->fd[1]);
193                 talloc_free(result);
194                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
195                 return NULL;
196         }
197
198         if (result->child == 0) {
199                 char c = 0;
200                 close(result->fd[0]);
201                 debug_extra = talloc_asprintf(NULL, "chainlock-%s:", ctdb_db->db_name);
202                 tdb_chainlock(ctdb_db->ltdb->tdb, key);
203                 write(result->fd[1], &c, 1);
204                 /* make sure we die when our parent dies */
205                 while (kill(parent, 0) == 0 || errno != ESRCH) {
206                         sleep(5);
207                 }
208                 _exit(0);
209         }
210
211         close(result->fd[1]);
212         set_close_on_exec(result->fd[0]);
213
214         /* This is an active lockwait child process */
215         DLIST_ADD_END(ctdb_db->lockwait_active, result, NULL);
216
217         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0]));
218
219         ctdb_db->pending_requests++;
220         talloc_set_destructor(result, lockwait_destructor);
221
222         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
223                                    EVENT_FD_READ, lockwait_handler,
224                                    (void *)result);
225         if (result->fde == NULL) {
226                 talloc_free(result);
227                 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
228                 return NULL;
229         }
230         tevent_fd_set_auto_close(result->fde);
231
232         result->start_time = timeval_current();
233         return result;
234 }