now that both daemon and client access the database, it needs to be a real disk file
[metze/ctdb/wip.git] / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_fetch_func(struct ctdb_call_info *call)
49 {
50         call->reply_data = &call->record_data;
51         return 0;
52 }
53
54
55 /*
56   attach to a specific database
57 */
58 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
59                                     int open_flags, mode_t mode)
60 {
61         struct ctdb_db_context *ctdb_db, *tmp_db;
62         TDB_DATA data;
63         int ret;
64
65         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
66         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
67
68         ctdb_db->ctdb = ctdb;
69         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
70         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
71
72         data.dptr = discard_const(name);
73         data.dsize = strlen(name);
74         ctdb_db->db_id = ctdb_hash(&data);
75
76         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
77                 if (tmp_db->db_id == ctdb_db->db_id) {
78                         ctdb_set_error(ctdb, "CTDB database hash collission '%s' : '%s'",
79                                         name, tmp_db->db_name);
80                         talloc_free(ctdb_db);
81                         return NULL;
82                 }
83         }
84
85         /* when we have a separate daemon this will need to be a real
86            file, not a TDB_INTERNAL, so the parent can access it to
87            for ltdb bypass */
88         ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_DEFAULT, open_flags, mode);
89         if (ctdb_db->ltdb == NULL) {
90                 ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
91                 talloc_free(ctdb_db);
92                 return NULL;
93         }
94
95
96         /* 
97           all databases support the "fetch" function. we need this in order to do forced migration of records
98          */
99         ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
100         if (ret != 0) {
101                 talloc_free(ctdb_db);
102                 return NULL;
103         }
104
105         DLIST_ADD(ctdb->db_list, ctdb_db);
106
107         return ctdb_db;
108 }
109
110 /*
111   return the lmaster given a key
112 */
113 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
114 {
115         return ctdb_hash(key) % ctdb->num_nodes;
116 }
117
118
119 /*
120   construct an initial header for a record with no ltdb header yet
121 */
122 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
123                                 TDB_DATA key,
124                                 struct ctdb_ltdb_header *header)
125 {
126         header->rsn = 0;
127         /* initial dmaster is the lmaster */
128         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
129         header->laccessor = header->dmaster;
130         header->lacount = 0;
131 }
132
133
134 /*
135   fetch a record from the ltdb, separating out the header information
136   and returning the body of the record. A valid (initial) header is
137   returned if the record is not present
138 */
139 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
140                     TDB_DATA key, struct ctdb_ltdb_header *header, 
141                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
142 {
143         TDB_DATA rec;
144         struct ctdb_context *ctdb = ctdb_db->ctdb;
145
146         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
147         if (rec.dsize < sizeof(*header)) {
148                 /* return an initial header */
149                 free(rec.dptr);
150                 ltdb_initial_header(ctdb_db, key, header);
151                 if (data) {
152                         data->dptr = NULL;
153                         data->dsize = 0;
154                 }
155                 return 0;
156         }
157
158         *header = *(struct ctdb_ltdb_header *)rec.dptr;
159
160         if (data) {
161                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
162                 data->dptr = talloc_memdup(mem_ctx, 
163                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
164                                            data->dsize);
165         }
166
167         free(rec.dptr);
168         if (data) {
169                 CTDB_NO_MEMORY(ctdb, data->dptr);
170         }
171
172         return 0;
173 }
174
175
176 /*
177   fetch a record from the ltdb, separating out the header information
178   and returning the body of the record. A valid (initial) header is
179   returned if the record is not present
180 */
181 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
182                     struct ctdb_ltdb_header *header, TDB_DATA data)
183 {
184         struct ctdb_context *ctdb = ctdb_db->ctdb;
185         TDB_DATA rec;
186         int ret;
187
188         rec.dsize = sizeof(*header) + data.dsize;
189         rec.dptr = talloc_size(ctdb, rec.dsize);
190         CTDB_NO_MEMORY(ctdb, rec.dptr);
191
192         memcpy(rec.dptr, header, sizeof(*header));
193         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
194
195         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
196         talloc_free(rec.dptr);
197
198         return ret;
199 }
200
201
202 /*
203   lock a record in the ltdb, given a key
204  */
205 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
206 {
207         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
208 }
209
210 /*
211   unlock a record in the ltdb, given a key
212  */
213 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
214 {
215         return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
216 }
217
218 /*
219   called when we should retry the operation
220  */
221 static void lock_fetch_callback(void *p)
222 {
223         struct ctdb_req_header *hdr = p;
224         struct ctdb_context *ctdb = talloc_find_parent_bytype(p, struct ctdb_context);
225         ctdb_recv_pkt(ctdb, (uint8_t *)hdr, hdr->length);
226         printf("PACKET REQUEUED\n");
227 }
228
229 /*
230   do a non-blocking ltdb_fetch with a locked record, deferring this
231   ctdb request until we have the chainlock
232
233   It does the following:
234
235    1) tries to get the chainlock. If it succeeds, then it fetches the record, and 
236       returns 0
237
238    2) if it fails to get a chainlock immediately then it sets up a
239    non-blocking chainlock via ctdb_lockwait, and when it gets the
240    chainlock it re-submits this ctdb request to the main packet
241    receive function
242
243    This effectively queues all ctdb requests that cannot be
244    immediately satisfied until it can get the lock. This means that
245    the main ctdb daemon will not block waiting for a chainlock held by
246    a client
247
248    There are 3 possible return values:
249
250        0:    means that it got the lock immediately.
251       -1:    means that it failed to get the lock, and won't retry
252       -2:    means that it failed to get the lock immediately, but will retry
253  */
254 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
255                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
256                                  struct ctdb_req_header *hdr, TDB_DATA *data)
257 {
258         int ret;
259         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
260         struct lockwait_handle *h;
261         
262         ret = tdb_chainlock_nonblock(tdb, key);
263
264         if (ret != 0 &&
265             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
266                 /* a hard failure - don't try again */
267                 return -1;
268         }
269
270         /* first the non-contended path */
271         if (ret == 0) {
272                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
273                 if (ret != 0) {
274                         tdb_chainunlock(tdb, key);
275                 }       
276                 return ret;
277         }
278
279         /* now the contended path */
280         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, hdr);
281         if (h == NULL) {
282                 tdb_chainunlock(tdb, key);
283                 return -1;
284         }
285
286         /* we get an extra reference to the packet here, to 
287            stop it being freed in the top level packet handler */
288         if (talloc_reference(ctdb_db, hdr) == NULL) {
289                 talloc_free(h);
290                 return -1;
291         }
292
293         /* now tell the caller than we will retry asynchronously */
294         return -2;
295 }