Merge branch 'v3-2-test' of ssh://git.samba.org/data/git/samba into v3-2-test
[jra/samba/.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_ctx {
27         struct tdb_wrap *wtdb;
28         uint32 db_id;
29 };
30
31 struct db_ctdb_rec {
32         struct db_ctdb_ctx *ctdb_ctx;
33         struct ctdb_ltdb_header header;
34 };
35
36 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
37 {
38         struct db_ctdb_rec *crec = talloc_get_type_abort(
39                 rec->private_data, struct db_ctdb_rec);
40         TDB_DATA cdata;
41         int ret;
42
43         cdata.dsize = sizeof(crec->header) + data.dsize;
44
45         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
46                 return NT_STATUS_NO_MEMORY;
47         }
48
49         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
50         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
51
52         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
53
54         SAFE_FREE(cdata.dptr);
55
56         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
57 }
58
59
60 /* for persistent databases the store is a bit different. We have to
61    ask the ctdb daemon to push the record to all nodes after the
62    store */
63 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
64 {
65         struct db_ctdb_rec *crec = talloc_get_type_abort(
66                 rec->private_data, struct db_ctdb_rec);
67         TDB_DATA cdata;
68         int ret;
69         NTSTATUS status;
70
71         cdata.dsize = sizeof(crec->header) + data.dsize;
72
73         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
74                 return NT_STATUS_NO_MEMORY;
75         }
76
77         crec->header.rsn++;
78
79         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
80         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
81
82         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
83         status = (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
84         
85         /* now tell ctdbd to update this record on all other nodes */
86         if (NT_STATUS_IS_OK(status)) {
87                 status = ctdbd_persistent_store(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
88         }
89
90         SAFE_FREE(cdata.dptr);
91
92         return status;
93 }
94
95 static NTSTATUS db_ctdb_delete(struct db_record *rec)
96 {
97         struct db_ctdb_rec *crec = talloc_get_type_abort(
98                 rec->private_data, struct db_ctdb_rec);
99         TDB_DATA data;
100         int ret;
101
102         /*
103          * We have to store the header with empty data. TODO: Fix the
104          * tdb-level cleanup
105          */
106
107         data.dptr = (uint8 *)&crec->header;
108         data.dsize = sizeof(crec->header);
109
110         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
111
112         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
113 }
114
115 static int db_ctdb_record_destr(struct db_record* data)
116 {
117         struct db_ctdb_rec *crec = talloc_get_type_abort(
118                 data->private_data, struct db_ctdb_rec);
119
120         DEBUG(10, (DEBUGLEVEL > 10
121                    ? "Unlocking db %u key %s\n"
122                    : "Unlocking db %u key %.20s\n",
123                    (int)crec->ctdb_ctx->db_id,
124                    hex_encode(data, (unsigned char *)data->key.dptr,
125                               data->key.dsize)));
126
127         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
128                 DEBUG(0, ("tdb_chainunlock failed\n"));
129                 return -1;
130         }
131
132         return 0;
133 }
134
135 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
136                                               TALLOC_CTX *mem_ctx,
137                                               TDB_DATA key)
138 {
139         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
140                                                         struct db_ctdb_ctx);
141         struct db_record *result;
142         struct db_ctdb_rec *crec;
143         NTSTATUS status;
144         TDB_DATA ctdb_data;
145         int migrate_attempts = 0;
146
147         if (!(result = talloc(mem_ctx, struct db_record))) {
148                 DEBUG(0, ("talloc failed\n"));
149                 return NULL;
150         }
151
152         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
153                 DEBUG(0, ("talloc failed\n"));
154                 TALLOC_FREE(result);
155                 return NULL;
156         }
157
158         result->private_data = (void *)crec;
159         crec->ctdb_ctx = ctx;
160
161         result->key.dsize = key.dsize;
162         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
163         if (result->key.dptr == NULL) {
164                 DEBUG(0, ("talloc failed\n"));
165                 TALLOC_FREE(result);
166                 return NULL;
167         }
168
169         /*
170          * Do a blocking lock on the record
171          */
172 again:
173
174         if (DEBUGLEVEL >= 10) {
175                 char *keystr = hex_encode(result, key.dptr, key.dsize);
176                 DEBUG(10, (DEBUGLEVEL > 10
177                            ? "Locking db %u key %s\n"
178                            : "Locking db %u key %.20s\n",
179                            (int)crec->ctdb_ctx->db_id, keystr));
180                 TALLOC_FREE(keystr);
181         }
182         
183         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
184                 DEBUG(3, ("tdb_chainlock failed\n"));
185                 TALLOC_FREE(result);
186                 return NULL;
187         }
188
189         if (db->persistent) {
190                 result->store = db_ctdb_store_persistent;
191         } else {
192                 result->store = db_ctdb_store;
193         }
194         result->delete_rec = db_ctdb_delete;
195         talloc_set_destructor(result, db_ctdb_record_destr);
196
197         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
198
199         /*
200          * See if we have a valid record and we are the dmaster. If so, we can
201          * take the shortcut and just return it.
202          */
203
204         if ((ctdb_data.dptr == NULL) ||
205             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
206             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
207 #if 0
208             || (random() % 2 != 0)
209 #endif
210 ) {
211                 SAFE_FREE(ctdb_data.dptr);
212                 tdb_chainunlock(ctx->wtdb->tdb, key);
213                 talloc_set_destructor(result, NULL);
214
215                 migrate_attempts += 1;
216
217                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
218                            ctdb_data.dptr, ctdb_data.dptr ?
219                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
220                            get_my_vnn()));
221
222                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
223                 if (!NT_STATUS_IS_OK(status)) {
224                         DEBUG(5, ("ctdb_migrate failed: %s\n",
225                                   nt_errstr(status)));
226                         TALLOC_FREE(result);
227                         return NULL;
228                 }
229                 /* now its migrated, try again */
230                 goto again;
231         }
232
233         if (migrate_attempts > 10) {
234                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
235                           migrate_attempts));
236         }
237
238         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
239
240         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
241         result->value.dptr = NULL;
242
243         if ((result->value.dsize != 0)
244             && !(result->value.dptr = (uint8 *)talloc_memdup(
245                          result, ctdb_data.dptr + sizeof(crec->header),
246                          result->value.dsize))) {
247                 DEBUG(0, ("talloc failed\n"));
248                 TALLOC_FREE(result);
249         }
250
251         SAFE_FREE(ctdb_data.dptr);
252
253         return result;
254 }
255
256 /*
257   fetch (unlocked, no migration) operation on ctdb
258  */
259 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
260                          TDB_DATA key, TDB_DATA *data)
261 {
262         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
263                                                         struct db_ctdb_ctx);
264         NTSTATUS status;
265         TDB_DATA ctdb_data;
266
267         /* try a direct fetch */
268         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
269
270         /*
271          * See if we have a valid record and we are the dmaster. If so, we can
272          * take the shortcut and just return it.
273          * we bypass the dmaster check for persistent databases
274          */
275         if ((ctdb_data.dptr != NULL) &&
276             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
277             (db->persistent ||
278              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
279                 /* we are the dmaster - avoid the ctdb protocol op */
280
281                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
282                 if (data->dsize == 0) {
283                         SAFE_FREE(ctdb_data.dptr);
284                         data->dptr = NULL;
285                         return 0;
286                 }
287
288                 data->dptr = (uint8 *)talloc_memdup(
289                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
290                         data->dsize);
291
292                 SAFE_FREE(ctdb_data.dptr);
293
294                 if (data->dptr == NULL) {
295                         return -1;
296                 }
297                 return 0;
298         }
299
300         SAFE_FREE(ctdb_data.dptr);
301
302         /* we weren't able to get it locally - ask ctdb to fetch it for us */
303         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
304         if (!NT_STATUS_IS_OK(status)) {
305                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
306                 return -1;
307         }
308
309         return 0;
310 }
311
312 struct traverse_state {
313         struct db_context *db;
314         int (*fn)(struct db_record *rec, void *private_data);
315         void *private_data;
316 };
317
318 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
319 {
320         struct traverse_state *state = (struct traverse_state *)private_data;
321         struct db_record *rec;
322         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
323         /* we have to give them a locked record to prevent races */
324         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
325         if (rec && rec->value.dsize > 0) {
326                 state->fn(rec, state->private_data);
327         }
328         talloc_free(tmp_ctx);
329 }
330
331 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
332                                         void *private_data)
333 {
334         struct traverse_state *state = (struct traverse_state *)private_data;
335         struct db_record *rec;
336         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
337         int ret = 0;
338         /* we have to give them a locked record to prevent races */
339         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
340         if (rec && rec->value.dsize > 0) {
341                 ret = state->fn(rec, state->private_data);
342         }
343         talloc_free(tmp_ctx);
344         return ret;
345 }
346
347 static int db_ctdb_traverse(struct db_context *db,
348                             int (*fn)(struct db_record *rec,
349                                       void *private_data),
350                             void *private_data)
351 {
352         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
353                                                         struct db_ctdb_ctx);
354         struct traverse_state state;
355
356         state.db = db;
357         state.fn = fn;
358         state.private_data = private_data;
359
360         if (db->persistent) {
361                 /* for persistent databases we don't need to do a ctdb traverse,
362                    we can do a faster local traverse */
363                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
364         }
365
366
367         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
368         return 0;
369 }
370
371 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
372 {
373         return NT_STATUS_MEDIA_WRITE_PROTECTED;
374 }
375
376 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
377 {
378         return NT_STATUS_MEDIA_WRITE_PROTECTED;
379 }
380
381 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
382 {
383         struct traverse_state *state = (struct traverse_state *)private_data;
384         struct db_record rec;
385         rec.key = key;
386         rec.value = data;
387         rec.store = db_ctdb_store_deny;
388         rec.delete_rec = db_ctdb_delete_deny;
389         rec.private_data = state->db;
390         state->fn(&rec, state->private_data);
391 }
392
393 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
394                                         void *private_data)
395 {
396         struct traverse_state *state = (struct traverse_state *)private_data;
397         struct db_record rec;
398         rec.key = kbuf;
399         rec.value = dbuf;
400         rec.store = db_ctdb_store_deny;
401         rec.delete_rec = db_ctdb_delete_deny;
402         rec.private_data = state->db;
403
404         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
405                 /* a deleted record */
406                 return 0;
407         }
408         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
409         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
410
411         return state->fn(&rec, state->private_data);
412 }
413
414 static int db_ctdb_traverse_read(struct db_context *db,
415                                  int (*fn)(struct db_record *rec,
416                                            void *private_data),
417                                  void *private_data)
418 {
419         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
420                                                         struct db_ctdb_ctx);
421         struct traverse_state state;
422
423         state.db = db;
424         state.fn = fn;
425         state.private_data = private_data;
426
427         if (db->persistent) {
428                 /* for persistent databases we don't need to do a ctdb traverse,
429                    we can do a faster local traverse */
430                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
431         }
432
433         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
434         return 0;
435 }
436
437 static int db_ctdb_get_seqnum(struct db_context *db)
438 {
439         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
440                                                         struct db_ctdb_ctx);
441         return tdb_get_seqnum(ctx->wtdb->tdb);
442 }
443
444
445 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
446                                 const char *name,
447                                 int hash_size, int tdb_flags,
448                                 int open_flags, mode_t mode)
449 {
450         struct db_context *result;
451         struct db_ctdb_ctx *db_ctdb;
452         char *db_path;
453
454         if (!lp_clustering()) {
455                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
456                 return NULL;
457         }
458
459         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
460                 DEBUG(0, ("talloc failed\n"));
461                 TALLOC_FREE(result);
462                 return NULL;
463         }
464
465         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
466                 DEBUG(0, ("talloc failed\n"));
467                 TALLOC_FREE(result);
468                 return NULL;
469         }
470
471         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
472                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
473                 TALLOC_FREE(result);
474                 return NULL;
475         }
476
477         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
478
479         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
480
481         /* only pass through specific flags */
482         tdb_flags &= TDB_SEQNUM;
483
484         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
485         if (db_ctdb->wtdb == NULL) {
486                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
487                 TALLOC_FREE(result);
488                 return NULL;
489         }
490         talloc_free(db_path);
491
492         result->private_data = (void *)db_ctdb;
493         result->fetch_locked = db_ctdb_fetch_locked;
494         result->fetch = db_ctdb_fetch;
495         result->traverse = db_ctdb_traverse;
496         result->traverse_read = db_ctdb_traverse_read;
497         result->get_seqnum = db_ctdb_get_seqnum;
498
499         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
500                  name, db_ctdb->db_id));
501
502         return result;
503 }
504 #endif