- use separate directories for the tdb files by default
[metze/ctdb/wip.git] / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_fetch_func(struct ctdb_call_info *call)
49 {
50         call->reply_data = &call->record_data;
51         return 0;
52 }
53
54
55 /*
56   attach to a specific database
57 */
58 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
59                                     int open_flags, mode_t mode)
60 {
61         struct ctdb_db_context *ctdb_db, *tmp_db;
62         TDB_DATA data;
63         int ret;
64
65         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
66         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
67
68         ctdb_db->ctdb = ctdb;
69         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
70         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
71
72         data.dptr = discard_const(name);
73         data.dsize = strlen(name);
74         ctdb_db->db_id = ctdb_hash(&data);
75
76         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
77                 if (tmp_db->db_id == ctdb_db->db_id) {
78                         ctdb_set_error(ctdb, "CTDB database hash collission '%s' : '%s'",
79                                         name, tmp_db->db_name);
80                         talloc_free(ctdb_db);
81                         return NULL;
82                 }
83         }
84
85         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
86                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
87                          ctdb->db_directory));
88                 talloc_free(ctdb_db);
89                 return NULL;
90         }
91
92         /* add the node id to the database name, so when we run on loopback
93            we don't conflict in the local filesystem */
94         name = talloc_asprintf(ctdb_db, "%s/%s", ctdb->db_directory, name);
95
96         /* when we have a separate daemon this will need to be a real
97            file, not a TDB_INTERNAL, so the parent can access it to
98            for ltdb bypass */
99         ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_CLEAR_IF_FIRST, open_flags, mode);
100         if (ctdb_db->ltdb == NULL) {
101                 ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
102                 talloc_free(ctdb_db);
103                 return NULL;
104         }
105
106
107         /* 
108           all databases support the "fetch" function. we need this in order to do forced migration of records
109          */
110         ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
111         if (ret != 0) {
112                 talloc_free(ctdb_db);
113                 return NULL;
114         }
115
116         DLIST_ADD(ctdb->db_list, ctdb_db);
117
118         return ctdb_db;
119 }
120
121 /*
122   return the lmaster given a key
123 */
124 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
125 {
126         return ctdb_hash(key) % ctdb->num_nodes;
127 }
128
129
130 /*
131   construct an initial header for a record with no ltdb header yet
132 */
133 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
134                                 TDB_DATA key,
135                                 struct ctdb_ltdb_header *header)
136 {
137         header->rsn = 0;
138         /* initial dmaster is the lmaster */
139         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
140         header->laccessor = header->dmaster;
141         header->lacount = 0;
142 }
143
144
145 /*
146   fetch a record from the ltdb, separating out the header information
147   and returning the body of the record. A valid (initial) header is
148   returned if the record is not present
149 */
150 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
151                     TDB_DATA key, struct ctdb_ltdb_header *header, 
152                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
153 {
154         TDB_DATA rec;
155         struct ctdb_context *ctdb = ctdb_db->ctdb;
156
157         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
158         if (rec.dsize < sizeof(*header)) {
159                 /* return an initial header */
160                 free(rec.dptr);
161                 ltdb_initial_header(ctdb_db, key, header);
162                 if (data) {
163                         data->dptr = NULL;
164                         data->dsize = 0;
165                 }
166                 return 0;
167         }
168
169         *header = *(struct ctdb_ltdb_header *)rec.dptr;
170
171         if (data) {
172                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
173                 data->dptr = talloc_memdup(mem_ctx, 
174                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
175                                            data->dsize);
176         }
177
178         free(rec.dptr);
179         if (data) {
180                 CTDB_NO_MEMORY(ctdb, data->dptr);
181         }
182
183         return 0;
184 }
185
186
187 /*
188   fetch a record from the ltdb, separating out the header information
189   and returning the body of the record. A valid (initial) header is
190   returned if the record is not present
191 */
192 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
193                     struct ctdb_ltdb_header *header, TDB_DATA data)
194 {
195         struct ctdb_context *ctdb = ctdb_db->ctdb;
196         TDB_DATA rec;
197         int ret;
198
199         rec.dsize = sizeof(*header) + data.dsize;
200         rec.dptr = talloc_size(ctdb, rec.dsize);
201         CTDB_NO_MEMORY(ctdb, rec.dptr);
202
203         memcpy(rec.dptr, header, sizeof(*header));
204         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
205
206         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
207         talloc_free(rec.dptr);
208
209         return ret;
210 }
211
212
213 /*
214   lock a record in the ltdb, given a key
215  */
216 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
217 {
218         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
219 }
220
221 /*
222   unlock a record in the ltdb, given a key
223  */
224 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
225 {
226         return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
227 }
228
229 /*
230   called when we should retry the operation
231  */
232 static void lock_fetch_callback(void *p)
233 {
234         struct ctdb_req_header *hdr = p;
235         struct ctdb_context *ctdb = talloc_find_parent_bytype(p, struct ctdb_context);
236         ctdb_recv_pkt(ctdb, (uint8_t *)hdr, hdr->length);
237         DEBUG(0,(__location__ " PACKET REQUEUED\n"));
238 }
239
240 /*
241   do a non-blocking ltdb_fetch with a locked record, deferring this
242   ctdb request until we have the chainlock
243
244   It does the following:
245
246    1) tries to get the chainlock. If it succeeds, then it fetches the record, and 
247       returns 0
248
249    2) if it fails to get a chainlock immediately then it sets up a
250    non-blocking chainlock via ctdb_lockwait, and when it gets the
251    chainlock it re-submits this ctdb request to the main packet
252    receive function
253
254    This effectively queues all ctdb requests that cannot be
255    immediately satisfied until it can get the lock. This means that
256    the main ctdb daemon will not block waiting for a chainlock held by
257    a client
258
259    There are 3 possible return values:
260
261        0:    means that it got the lock immediately.
262       -1:    means that it failed to get the lock, and won't retry
263       -2:    means that it failed to get the lock immediately, but will retry
264  */
265 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
266                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
267                                  struct ctdb_req_header *hdr, TDB_DATA *data)
268 {
269         int ret;
270         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
271         struct lockwait_handle *h;
272         
273         ret = tdb_chainlock_nonblock(tdb, key);
274
275         if (ret != 0 &&
276             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
277                 /* a hard failure - don't try again */
278                 return -1;
279         }
280
281         /* first the non-contended path */
282         if (ret == 0) {
283                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
284                 if (ret != 0) {
285                         tdb_chainunlock(tdb, key);
286                 }       
287                 return ret;
288         }
289
290         /* now the contended path */
291         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, hdr);
292         if (h == NULL) {
293                 tdb_chainunlock(tdb, key);
294                 return -1;
295         }
296
297         /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
298            so it won't be freed yet */
299         talloc_steal(ctdb_db, hdr);
300
301         /* now tell the caller than we will retry asynchronously */
302         return -2;
303 }