merge from tridge
[metze/ctdb/wip.git] / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_fetch_func(struct ctdb_call_info *call)
49 {
50         call->reply_data = &call->record_data;
51         return 0;
52 }
53
54
55 /*
56   attach to a specific database
57 */
58 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
59                                     int open_flags, mode_t mode)
60 {
61         struct ctdb_db_context *ctdb_db, *tmp_db;
62         TDB_DATA data;
63         int ret;
64
65         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
66         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
67
68         ctdb_db->ctdb = ctdb;
69         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
70         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
71
72         data.dptr = discard_const(name);
73         data.dsize = strlen(name);
74         ctdb_db->db_id = ctdb_hash(&data);
75
76         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
77                 if (tmp_db->db_id == ctdb_db->db_id) {
78                         ctdb_set_error(ctdb, "CTDB database hash collission '%s' : '%s'",
79                                         name, tmp_db->db_name);
80                         talloc_free(ctdb_db);
81                         return NULL;
82                 }
83         }
84
85         /* add the node id to the database name, so when we run on loopback
86            we don't conflict in the local filesystem */
87         name = talloc_asprintf(ctdb_db, "%s.%u", name, ctdb_get_vnn(ctdb));
88
89         /* when we have a separate daemon this will need to be a real
90            file, not a TDB_INTERNAL, so the parent can access it to
91            for ltdb bypass */
92         ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_DEFAULT, open_flags, mode);
93         if (ctdb_db->ltdb == NULL) {
94                 ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
95                 talloc_free(ctdb_db);
96                 return NULL;
97         }
98
99
100         /* 
101           all databases support the "fetch" function. we need this in order to do forced migration of records
102          */
103         ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
104         if (ret != 0) {
105                 talloc_free(ctdb_db);
106                 return NULL;
107         }
108
109         DLIST_ADD(ctdb->db_list, ctdb_db);
110
111         return ctdb_db;
112 }
113
114 /*
115   return the lmaster given a key
116 */
117 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
118 {
119         return ctdb_hash(key) % ctdb->num_nodes;
120 }
121
122
123 /*
124   construct an initial header for a record with no ltdb header yet
125 */
126 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
127                                 TDB_DATA key,
128                                 struct ctdb_ltdb_header *header)
129 {
130         header->rsn = 0;
131         /* initial dmaster is the lmaster */
132         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
133         header->laccessor = header->dmaster;
134         header->lacount = 0;
135 }
136
137
138 /*
139   fetch a record from the ltdb, separating out the header information
140   and returning the body of the record. A valid (initial) header is
141   returned if the record is not present
142 */
143 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
144                     TDB_DATA key, struct ctdb_ltdb_header *header, 
145                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
146 {
147         TDB_DATA rec;
148         struct ctdb_context *ctdb = ctdb_db->ctdb;
149
150         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
151         if (rec.dsize < sizeof(*header)) {
152                 /* return an initial header */
153                 free(rec.dptr);
154                 ltdb_initial_header(ctdb_db, key, header);
155                 if (data) {
156                         data->dptr = NULL;
157                         data->dsize = 0;
158                 }
159                 return 0;
160         }
161
162         *header = *(struct ctdb_ltdb_header *)rec.dptr;
163
164         if (data) {
165                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
166                 data->dptr = talloc_memdup(mem_ctx, 
167                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
168                                            data->dsize);
169         }
170
171         free(rec.dptr);
172         if (data) {
173                 CTDB_NO_MEMORY(ctdb, data->dptr);
174         }
175
176         return 0;
177 }
178
179
180 /*
181   fetch a record from the ltdb, separating out the header information
182   and returning the body of the record. A valid (initial) header is
183   returned if the record is not present
184 */
185 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
186                     struct ctdb_ltdb_header *header, TDB_DATA data)
187 {
188         struct ctdb_context *ctdb = ctdb_db->ctdb;
189         TDB_DATA rec;
190         int ret;
191
192         rec.dsize = sizeof(*header) + data.dsize;
193         rec.dptr = talloc_size(ctdb, rec.dsize);
194         CTDB_NO_MEMORY(ctdb, rec.dptr);
195
196         memcpy(rec.dptr, header, sizeof(*header));
197         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
198
199         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
200         talloc_free(rec.dptr);
201
202         return ret;
203 }
204
205
206 /*
207   lock a record in the ltdb, given a key
208  */
209 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
210 {
211         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
212 }
213
214 /*
215   unlock a record in the ltdb, given a key
216  */
217 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
218 {
219         return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
220 }
221
222 /*
223   called when we should retry the operation
224  */
225 static void lock_fetch_callback(void *p)
226 {
227         struct ctdb_req_header *hdr = p;
228         struct ctdb_context *ctdb = talloc_find_parent_bytype(p, struct ctdb_context);
229         ctdb_recv_pkt(ctdb, (uint8_t *)hdr, hdr->length);
230         printf("PACKET REQUEUED\n");
231 }
232
233 /*
234   do a non-blocking ltdb_fetch with a locked record, deferring this
235   ctdb request until we have the chainlock
236
237   It does the following:
238
239    1) tries to get the chainlock. If it succeeds, then it fetches the record, and 
240       returns 0
241
242    2) if it fails to get a chainlock immediately then it sets up a
243    non-blocking chainlock via ctdb_lockwait, and when it gets the
244    chainlock it re-submits this ctdb request to the main packet
245    receive function
246
247    This effectively queues all ctdb requests that cannot be
248    immediately satisfied until it can get the lock. This means that
249    the main ctdb daemon will not block waiting for a chainlock held by
250    a client
251
252    There are 3 possible return values:
253
254        0:    means that it got the lock immediately.
255       -1:    means that it failed to get the lock, and won't retry
256       -2:    means that it failed to get the lock immediately, but will retry
257  */
258 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
259                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
260                                  struct ctdb_req_header *hdr, TDB_DATA *data)
261 {
262         int ret;
263         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
264         struct lockwait_handle *h;
265         
266         ret = tdb_chainlock_nonblock(tdb, key);
267
268         if (ret != 0 &&
269             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
270                 /* a hard failure - don't try again */
271                 return -1;
272         }
273
274         /* first the non-contended path */
275         if (ret == 0) {
276                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
277                 if (ret != 0) {
278                         tdb_chainunlock(tdb, key);
279                 }       
280                 return ret;
281         }
282
283         /* now the contended path */
284         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, hdr);
285         if (h == NULL) {
286                 tdb_chainunlock(tdb, key);
287                 return -1;
288         }
289
290         /* we get an extra reference to the packet here, to 
291            stop it being freed in the top level packet handler */
292         if (talloc_reference(ctdb_db, hdr) == NULL) {
293                 talloc_free(h);
294                 return -1;
295         }
296
297         /* now tell the caller than we will retry asynchronously */
298         return -2;
299 }