aa2f9d371ca36c0437c4f0651a3f76c29c361e10
[jra/samba/.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_ctx {
27         struct tdb_wrap *wtdb;
28         uint32 db_id;
29 };
30
31 struct db_ctdb_rec {
32         struct db_ctdb_ctx *ctdb_ctx;
33         struct ctdb_ltdb_header header;
34 };
35
36 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
37 {
38         struct db_ctdb_rec *crec = talloc_get_type_abort(
39                 rec->private_data, struct db_ctdb_rec);
40         TDB_DATA cdata;
41         int ret;
42
43         cdata.dsize = sizeof(crec->header) + data.dsize;
44
45         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
46                 return NT_STATUS_NO_MEMORY;
47         }
48
49         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
50         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
51
52         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
53
54         SAFE_FREE(cdata.dptr);
55
56         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
57 }
58
59
60 /* for persistent databases the store is a bit different. We have to
61    ask the ctdb daemon to push the record to all nodes after the
62    store */
63 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
64 {
65         struct db_ctdb_rec *crec = talloc_get_type_abort(
66                 rec->private_data, struct db_ctdb_rec);
67         TDB_DATA cdata;
68         int ret;
69         NTSTATUS status;
70
71         cdata.dsize = sizeof(crec->header) + data.dsize;
72
73         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
74                 return NT_STATUS_NO_MEMORY;
75         }
76
77         crec->header.rsn++;
78
79         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
80         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
81
82         status = ctdbd_start_persistent_update(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
83         
84         /* now tell ctdbd to update this record on all other nodes */
85         if (NT_STATUS_IS_OK(status)) {
86                 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
87                 status = (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
88         }
89
90         /* now tell ctdbd to update this record on all other nodes */
91         if (NT_STATUS_IS_OK(status)) {
92                 status = ctdbd_persistent_store(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
93         } else {
94                 ctdbd_cancel_persistent_update(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
95         }
96
97         SAFE_FREE(cdata.dptr);
98
99         return status;
100 }
101
102 static NTSTATUS db_ctdb_delete(struct db_record *rec)
103 {
104         struct db_ctdb_rec *crec = talloc_get_type_abort(
105                 rec->private_data, struct db_ctdb_rec);
106         TDB_DATA data;
107         int ret;
108
109         /*
110          * We have to store the header with empty data. TODO: Fix the
111          * tdb-level cleanup
112          */
113
114         data.dptr = (uint8 *)&crec->header;
115         data.dsize = sizeof(crec->header);
116
117         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
118
119         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
120 }
121
122 static int db_ctdb_record_destr(struct db_record* data)
123 {
124         struct db_ctdb_rec *crec = talloc_get_type_abort(
125                 data->private_data, struct db_ctdb_rec);
126
127         DEBUG(10, (DEBUGLEVEL > 10
128                    ? "Unlocking db %u key %s\n"
129                    : "Unlocking db %u key %.20s\n",
130                    (int)crec->ctdb_ctx->db_id,
131                    hex_encode(data, (unsigned char *)data->key.dptr,
132                               data->key.dsize)));
133
134         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
135                 DEBUG(0, ("tdb_chainunlock failed\n"));
136                 return -1;
137         }
138
139         return 0;
140 }
141
142 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
143                                               TALLOC_CTX *mem_ctx,
144                                               TDB_DATA key)
145 {
146         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
147                                                         struct db_ctdb_ctx);
148         struct db_record *result;
149         struct db_ctdb_rec *crec;
150         NTSTATUS status;
151         TDB_DATA ctdb_data;
152         int migrate_attempts = 0;
153
154         if (!(result = talloc(mem_ctx, struct db_record))) {
155                 DEBUG(0, ("talloc failed\n"));
156                 return NULL;
157         }
158
159         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
160                 DEBUG(0, ("talloc failed\n"));
161                 TALLOC_FREE(result);
162                 return NULL;
163         }
164
165         result->private_data = (void *)crec;
166         crec->ctdb_ctx = ctx;
167
168         result->key.dsize = key.dsize;
169         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
170         if (result->key.dptr == NULL) {
171                 DEBUG(0, ("talloc failed\n"));
172                 TALLOC_FREE(result);
173                 return NULL;
174         }
175
176         /*
177          * Do a blocking lock on the record
178          */
179 again:
180
181         if (DEBUGLEVEL >= 10) {
182                 char *keystr = hex_encode(result, key.dptr, key.dsize);
183                 DEBUG(10, (DEBUGLEVEL > 10
184                            ? "Locking db %u key %s\n"
185                            : "Locking db %u key %.20s\n",
186                            (int)crec->ctdb_ctx->db_id, keystr));
187                 TALLOC_FREE(keystr);
188         }
189         
190         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
191                 DEBUG(3, ("tdb_chainlock failed\n"));
192                 TALLOC_FREE(result);
193                 return NULL;
194         }
195
196         if (db->persistent) {
197                 result->store = db_ctdb_store_persistent;
198         } else {
199                 result->store = db_ctdb_store;
200         }
201         result->delete_rec = db_ctdb_delete;
202         talloc_set_destructor(result, db_ctdb_record_destr);
203
204         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
205
206         /*
207          * See if we have a valid record and we are the dmaster. If so, we can
208          * take the shortcut and just return it.
209          */
210
211         if ((ctdb_data.dptr == NULL) ||
212             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
213             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
214 #if 0
215             || (random() % 2 != 0)
216 #endif
217 ) {
218                 SAFE_FREE(ctdb_data.dptr);
219                 tdb_chainunlock(ctx->wtdb->tdb, key);
220                 talloc_set_destructor(result, NULL);
221
222                 migrate_attempts += 1;
223
224                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
225                            ctdb_data.dptr, ctdb_data.dptr ?
226                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
227                            get_my_vnn()));
228
229                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
230                 if (!NT_STATUS_IS_OK(status)) {
231                         DEBUG(5, ("ctdb_migrate failed: %s\n",
232                                   nt_errstr(status)));
233                         TALLOC_FREE(result);
234                         return NULL;
235                 }
236                 /* now its migrated, try again */
237                 goto again;
238         }
239
240         if (migrate_attempts > 10) {
241                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
242                           migrate_attempts));
243         }
244
245         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
246
247         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
248         result->value.dptr = NULL;
249
250         if ((result->value.dsize != 0)
251             && !(result->value.dptr = (uint8 *)talloc_memdup(
252                          result, ctdb_data.dptr + sizeof(crec->header),
253                          result->value.dsize))) {
254                 DEBUG(0, ("talloc failed\n"));
255                 TALLOC_FREE(result);
256         }
257
258         SAFE_FREE(ctdb_data.dptr);
259
260         return result;
261 }
262
263 /*
264   fetch (unlocked, no migration) operation on ctdb
265  */
266 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
267                          TDB_DATA key, TDB_DATA *data)
268 {
269         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
270                                                         struct db_ctdb_ctx);
271         NTSTATUS status;
272         TDB_DATA ctdb_data;
273
274         /* try a direct fetch */
275         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
276
277         /*
278          * See if we have a valid record and we are the dmaster. If so, we can
279          * take the shortcut and just return it.
280          * we bypass the dmaster check for persistent databases
281          */
282         if ((ctdb_data.dptr != NULL) &&
283             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
284             (db->persistent ||
285              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
286                 /* we are the dmaster - avoid the ctdb protocol op */
287
288                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
289                 if (data->dsize == 0) {
290                         SAFE_FREE(ctdb_data.dptr);
291                         data->dptr = NULL;
292                         return 0;
293                 }
294
295                 data->dptr = (uint8 *)talloc_memdup(
296                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
297                         data->dsize);
298
299                 SAFE_FREE(ctdb_data.dptr);
300
301                 if (data->dptr == NULL) {
302                         return -1;
303                 }
304                 return 0;
305         }
306
307         SAFE_FREE(ctdb_data.dptr);
308
309         /* we weren't able to get it locally - ask ctdb to fetch it for us */
310         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
311         if (!NT_STATUS_IS_OK(status)) {
312                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
313                 return -1;
314         }
315
316         return 0;
317 }
318
319 struct traverse_state {
320         struct db_context *db;
321         int (*fn)(struct db_record *rec, void *private_data);
322         void *private_data;
323 };
324
325 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
326 {
327         struct traverse_state *state = (struct traverse_state *)private_data;
328         struct db_record *rec;
329         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
330         /* we have to give them a locked record to prevent races */
331         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
332         if (rec && rec->value.dsize > 0) {
333                 state->fn(rec, state->private_data);
334         }
335         talloc_free(tmp_ctx);
336 }
337
338 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
339                                         void *private_data)
340 {
341         struct traverse_state *state = (struct traverse_state *)private_data;
342         struct db_record *rec;
343         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
344         int ret = 0;
345         /* we have to give them a locked record to prevent races */
346         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
347         if (rec && rec->value.dsize > 0) {
348                 ret = state->fn(rec, state->private_data);
349         }
350         talloc_free(tmp_ctx);
351         return ret;
352 }
353
354 static int db_ctdb_traverse(struct db_context *db,
355                             int (*fn)(struct db_record *rec,
356                                       void *private_data),
357                             void *private_data)
358 {
359         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
360                                                         struct db_ctdb_ctx);
361         struct traverse_state state;
362
363         state.db = db;
364         state.fn = fn;
365         state.private_data = private_data;
366
367         if (db->persistent) {
368                 /* for persistent databases we don't need to do a ctdb traverse,
369                    we can do a faster local traverse */
370                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
371         }
372
373
374         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
375         return 0;
376 }
377
378 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
379 {
380         return NT_STATUS_MEDIA_WRITE_PROTECTED;
381 }
382
383 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
384 {
385         return NT_STATUS_MEDIA_WRITE_PROTECTED;
386 }
387
388 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
389 {
390         struct traverse_state *state = (struct traverse_state *)private_data;
391         struct db_record rec;
392         rec.key = key;
393         rec.value = data;
394         rec.store = db_ctdb_store_deny;
395         rec.delete_rec = db_ctdb_delete_deny;
396         rec.private_data = state->db;
397         state->fn(&rec, state->private_data);
398 }
399
400 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
401                                         void *private_data)
402 {
403         struct traverse_state *state = (struct traverse_state *)private_data;
404         struct db_record rec;
405         rec.key = kbuf;
406         rec.value = dbuf;
407         rec.store = db_ctdb_store_deny;
408         rec.delete_rec = db_ctdb_delete_deny;
409         rec.private_data = state->db;
410
411         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
412                 /* a deleted record */
413                 return 0;
414         }
415         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
416         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
417
418         return state->fn(&rec, state->private_data);
419 }
420
421 static int db_ctdb_traverse_read(struct db_context *db,
422                                  int (*fn)(struct db_record *rec,
423                                            void *private_data),
424                                  void *private_data)
425 {
426         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
427                                                         struct db_ctdb_ctx);
428         struct traverse_state state;
429
430         state.db = db;
431         state.fn = fn;
432         state.private_data = private_data;
433
434         if (db->persistent) {
435                 /* for persistent databases we don't need to do a ctdb traverse,
436                    we can do a faster local traverse */
437                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
438         }
439
440         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
441         return 0;
442 }
443
444 static int db_ctdb_get_seqnum(struct db_context *db)
445 {
446         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
447                                                         struct db_ctdb_ctx);
448         return tdb_get_seqnum(ctx->wtdb->tdb);
449 }
450
451 static int db_ctdb_trans_dummy(struct db_context *db)
452 {
453         /*
454          * Not implemented yet, just return ok
455          */
456         return 0;
457 }
458
459 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
460                                 const char *name,
461                                 int hash_size, int tdb_flags,
462                                 int open_flags, mode_t mode)
463 {
464         struct db_context *result;
465         struct db_ctdb_ctx *db_ctdb;
466         char *db_path;
467
468         if (!lp_clustering()) {
469                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
470                 return NULL;
471         }
472
473         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
474                 DEBUG(0, ("talloc failed\n"));
475                 TALLOC_FREE(result);
476                 return NULL;
477         }
478
479         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
480                 DEBUG(0, ("talloc failed\n"));
481                 TALLOC_FREE(result);
482                 return NULL;
483         }
484
485         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
486                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
487                 TALLOC_FREE(result);
488                 return NULL;
489         }
490
491         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
492
493         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
494
495         /* only pass through specific flags */
496         tdb_flags &= TDB_SEQNUM;
497
498         /* honor permissions if user has specified O_CREAT */
499         if (open_flags & O_CREAT) {
500                 chmod(db_path, mode);
501         }
502
503         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
504         if (db_ctdb->wtdb == NULL) {
505                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
506                 TALLOC_FREE(result);
507                 return NULL;
508         }
509         talloc_free(db_path);
510
511         result->private_data = (void *)db_ctdb;
512         result->fetch_locked = db_ctdb_fetch_locked;
513         result->fetch = db_ctdb_fetch;
514         result->traverse = db_ctdb_traverse;
515         result->traverse_read = db_ctdb_traverse_read;
516         result->get_seqnum = db_ctdb_get_seqnum;
517         result->transaction_start = db_ctdb_trans_dummy;
518         result->transaction_commit = db_ctdb_trans_dummy;
519         result->transaction_cancel = db_ctdb_trans_dummy;
520
521         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
522                  name, db_ctdb->db_id));
523
524         return result;
525 }
526 #endif