r23779: Change from v2 or later to v3 or later.
[gd/samba/.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 #include "includes.h"
22
23 #ifdef CLUSTER_SUPPORT
24
25 #include "ctdb.h"
26 #include "ctdb_private.h"
27
28 struct db_ctdb_ctx {
29         struct tdb_wrap *wtdb;
30         uint32 db_id;
31         struct ctdbd_connection *conn;
32 };
33
34 struct db_ctdb_rec {
35         struct db_ctdb_ctx *ctdb_ctx;
36         struct ctdb_ltdb_header header;
37 };
38
39 static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx);
40
41 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
42 {
43         struct db_ctdb_rec *crec = talloc_get_type_abort(
44                 rec->private_data, struct db_ctdb_rec);
45         TDB_DATA cdata;
46         int ret;
47
48         cdata.dsize = sizeof(crec->header) + data.dsize;
49
50         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
51                 return NT_STATUS_NO_MEMORY;
52         }
53
54         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
55         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
56
57         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
58
59         SAFE_FREE(cdata.dptr);
60
61         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
62 }
63
64 static NTSTATUS db_ctdb_delete(struct db_record *rec)
65 {
66         struct db_ctdb_rec *crec = talloc_get_type_abort(
67                 rec->private_data, struct db_ctdb_rec);
68         TDB_DATA data;
69         int ret;
70
71         /*
72          * We have to store the header with empty data. TODO: Fix the
73          * tdb-level cleanup
74          */
75
76         data.dptr = (uint8 *)&crec->header;
77         data.dsize = sizeof(crec->header);
78
79         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
80
81         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
82 }
83
84 static int db_ctdb_record_destr(struct db_record* data)
85 {
86         struct db_ctdb_rec *crec = talloc_get_type_abort(
87                 data->private_data, struct db_ctdb_rec);
88
89         DEBUG(10, ("Unlocking key %s\n",
90                    hex_encode(data, (unsigned char *)data->key.dptr,
91                               data->key.dsize)));
92
93         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
94                 DEBUG(0, ("tdb_chainunlock failed\n"));
95                 return -1;
96         }
97
98         return 0;
99 }
100
101 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
102                                               TALLOC_CTX *mem_ctx,
103                                               TDB_DATA key)
104 {
105         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
106                                                         struct db_ctdb_ctx);
107         struct db_record *result;
108         struct db_ctdb_rec *crec;
109         NTSTATUS status;
110         TDB_DATA ctdb_data;
111
112         if (!(result = talloc(mem_ctx, struct db_record))) {
113                 DEBUG(0, ("talloc failed\n"));
114                 return NULL;
115         }
116
117         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
118                 DEBUG(0, ("talloc failed\n"));
119                 TALLOC_FREE(result);
120                 return NULL;
121         }
122
123         result->private_data = (void *)crec;
124         crec->ctdb_ctx = ctx;
125
126         result->key.dsize = key.dsize;
127         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
128         if (result->key.dptr == NULL) {
129                 DEBUG(0, ("talloc failed\n"));
130                 TALLOC_FREE(result);
131                 return NULL;
132         }
133
134         /*
135          * Do a blocking lock on the record
136          */
137 again:
138
139         DEBUG(10, ("Locking key %s\n",
140                    hex_encode(result, (unsigned char *)key.dptr,
141                               key.dsize)));
142         
143         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
144                 DEBUG(3, ("tdb_chainlock failed\n"));
145                 TALLOC_FREE(result);
146                 return NULL;
147         }
148
149         result->store = db_ctdb_store;
150         result->delete_rec = db_ctdb_delete;
151         talloc_set_destructor(result, db_ctdb_record_destr);
152
153         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
154
155         /*
156          * See if we have a valid record and we are the dmaster. If so, we can
157          * take the shortcut and just return it.
158          */
159
160         if ((ctdb_data.dptr == NULL) ||
161             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
162             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
163 #if 0
164             || (random() % 2 != 0)
165 #endif
166 ) {
167                 SAFE_FREE(ctdb_data.dptr);
168                 tdb_chainunlock(ctx->wtdb->tdb, key);
169                 talloc_set_destructor(result, NULL);
170
171                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
172                            ctdb_data.dptr, ctdb_data.dptr ?
173                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
174                            get_my_vnn()));
175
176                 status = ctdbd_migrate(db_ctdbd_conn(ctx), ctx->db_id, key);
177                 if (!NT_STATUS_IS_OK(status)) {
178                         DEBUG(5, ("ctdb_migrate failed: %s\n",
179                                   nt_errstr(status)));
180                         TALLOC_FREE(result);
181                         return NULL;
182                 }
183                 /* now its migrated, try again */
184                 goto again;
185         }
186
187         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
188
189         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
190         result->value.dptr = NULL;
191
192         if ((result->value.dsize != 0)
193             && !(result->value.dptr = (uint8 *)talloc_memdup(
194                          result, ctdb_data.dptr + sizeof(crec->header),
195                          result->value.dsize))) {
196                 DEBUG(0, ("talloc failed\n"));
197                 TALLOC_FREE(result);
198         }
199
200         SAFE_FREE(ctdb_data.dptr);
201
202         return result;
203 }
204
205 /*
206   fetch (unlocked, no migration) operation on ctdb
207  */
208 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
209                          TDB_DATA key, TDB_DATA *data)
210 {
211         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
212                                                         struct db_ctdb_ctx);
213         NTSTATUS status;
214         TDB_DATA ctdb_data;
215
216         /* try a direct fetch */
217         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
218
219         /*
220          * See if we have a valid record and we are the dmaster. If so, we can
221          * take the shortcut and just return it.
222          */
223         if ((ctdb_data.dptr != NULL) &&
224             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
225             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn()) {
226                 /* we are the dmaster - avoid the ctdb protocol op */
227
228                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
229                 if (data->dsize == 0) {
230                         SAFE_FREE(ctdb_data.dptr);
231                         data->dptr = NULL;
232                         return 0;
233                 }
234
235                 data->dptr = (uint8 *)talloc_memdup(
236                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
237                         data->dsize);
238
239                 SAFE_FREE(ctdb_data.dptr);
240
241                 if (data->dptr == NULL) {
242                         return -1;
243                 }
244                 return 0;
245         }
246
247         SAFE_FREE(ctdb_data.dptr);
248
249         /* we weren't able to get it locally - ask ctdb to fetch it for us */
250         status = ctdbd_fetch(db_ctdbd_conn(ctx), ctx->db_id, key, mem_ctx,
251                              data);
252         if (!NT_STATUS_IS_OK(status)) {
253                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
254                 return -1;
255         }
256
257         return 0;
258 }
259
260 struct traverse_state {
261         struct db_context *db;
262         int (*fn)(struct db_record *rec, void *private_data);
263         void *private_data;
264 };
265
266 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
267 {
268         struct traverse_state *state = (struct traverse_state *)private_data;
269         struct db_record *rec;
270         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
271         /* we have to give them a locked record to prevent races */
272         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
273         if (rec && rec->value.dsize > 0) {
274                 state->fn(rec, state->private_data);
275         }
276         talloc_free(tmp_ctx);
277 }
278
279 static int db_ctdb_traverse(struct db_context *db,
280                             int (*fn)(struct db_record *rec,
281                                       void *private_data),
282                             void *private_data)
283 {
284         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
285                                                         struct db_ctdb_ctx);
286         struct traverse_state state;
287
288         state.db = db;
289         state.fn = fn;
290         state.private_data = private_data;
291
292         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
293         return 0;
294 }
295
296 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
297 {
298         return NT_STATUS_MEDIA_WRITE_PROTECTED;
299 }
300
301 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
302 {
303         return NT_STATUS_MEDIA_WRITE_PROTECTED;
304 }
305
306 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
307 {
308         struct traverse_state *state = (struct traverse_state *)private_data;
309         struct db_record rec;
310         rec.key = key;
311         rec.value = data;
312         rec.store = db_ctdb_store_deny;
313         rec.delete_rec = db_ctdb_delete_deny;
314         rec.private_data = state->db;
315         state->fn(&rec, state->private_data);
316 }
317
318 static int db_ctdb_traverse_read(struct db_context *db,
319                                  int (*fn)(struct db_record *rec,
320                                            void *private_data),
321                                  void *private_data)
322 {
323         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
324                                                         struct db_ctdb_ctx);
325         struct traverse_state state;
326
327         state.db = db;
328         state.fn = fn;
329         state.private_data = private_data;
330
331         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
332         return 0;
333 }
334
335 static int db_ctdb_get_seqnum(struct db_context *db)
336 {
337         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
338                                                         struct db_ctdb_ctx);
339         return tdb_get_seqnum(ctx->wtdb->tdb);
340 }
341
342 /*
343  * Get the ctdbd connection for a database. If possible, re-use the messaging
344  * ctdbd connection
345  */
346 static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx)
347 {
348         struct ctdbd_connection *result;
349
350         result = messaging_ctdbd_connection();
351
352         if (result != NULL) {
353
354                 if (ctx->conn == NULL) {
355                         /*
356                          * Someone has initialized messaging since we
357                          * initialized our own connection, we don't need it
358                          * anymore.
359                          */
360                         TALLOC_FREE(ctx->conn);
361                 }
362
363                 return result;
364         }
365
366         if (ctx->conn == NULL) {
367                 ctdbd_init_connection(ctx, &ctx->conn);
368                 set_my_vnn(ctdbd_vnn(ctx->conn));
369         }
370
371         return ctx->conn;
372 }
373
374 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
375                                 const char *name,
376                                 int hash_size, int tdb_flags,
377                                 int open_flags, mode_t mode)
378 {
379         struct db_context *result;
380         struct db_ctdb_ctx *db_ctdb;
381         char *db_path;
382         NTSTATUS status;
383
384         if (!lp_clustering()) {
385                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
386                 return NULL;
387         }
388
389         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
390                 DEBUG(0, ("talloc failed\n"));
391                 TALLOC_FREE(result);
392                 return NULL;
393         }
394
395         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
396                 DEBUG(0, ("talloc failed\n"));
397                 TALLOC_FREE(result);
398                 return NULL;
399         }
400
401         db_ctdb->conn = NULL;
402
403         status = ctdbd_db_attach(db_ctdbd_conn(db_ctdb), name,
404                                  &db_ctdb->db_id, tdb_flags);
405
406         if (!NT_STATUS_IS_OK(status)) {
407                 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
408                           nt_errstr(status)));
409                 TALLOC_FREE(result);
410                 return NULL;
411         }
412
413         db_path = ctdbd_dbpath(db_ctdbd_conn(db_ctdb), db_ctdb,
414                                db_ctdb->db_id);
415
416         /* only pass through specific flags */
417         tdb_flags &= TDB_SEQNUM;
418
419         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
420         if (db_ctdb->wtdb == NULL) {
421                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
422                 TALLOC_FREE(result);
423                 return NULL;
424         }
425         talloc_free(db_path);
426
427         result->private_data = (void *)db_ctdb;
428         result->fetch_locked = db_ctdb_fetch_locked;
429         result->fetch = db_ctdb_fetch;
430         result->traverse = db_ctdb_traverse;
431         result->traverse_read = db_ctdb_traverse_read;
432         result->get_seqnum = db_ctdb_get_seqnum;
433
434         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
435                  name, db_ctdb->db_id));
436
437         return result;
438 }
439
440 #else
441
442 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
443                                 const char *name,
444                                 int hash_size, int tdb_flags,
445                                 int open_flags, mode_t mode)
446 {
447         DEBUG(0, ("no clustering compiled in\n"));
448         return NULL;
449 }
450
451 #endif