Fix dbwrap debug output
[jra/samba/.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21
22 #ifdef CLUSTER_SUPPORT
23
24 #include "ctdb.h"
25 #include "ctdb_private.h"
26
27 struct db_ctdb_ctx {
28         struct tdb_wrap *wtdb;
29         uint32 db_id;
30         struct ctdbd_connection *conn;
31 };
32
33 struct db_ctdb_rec {
34         struct db_ctdb_ctx *ctdb_ctx;
35         struct ctdb_ltdb_header header;
36 };
37
38 static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx);
39
40 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
41 {
42         struct db_ctdb_rec *crec = talloc_get_type_abort(
43                 rec->private_data, struct db_ctdb_rec);
44         TDB_DATA cdata;
45         int ret;
46
47         cdata.dsize = sizeof(crec->header) + data.dsize;
48
49         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
50                 return NT_STATUS_NO_MEMORY;
51         }
52
53         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
54         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
55
56         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
57
58         SAFE_FREE(cdata.dptr);
59
60         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
61 }
62
63 static NTSTATUS db_ctdb_delete(struct db_record *rec)
64 {
65         struct db_ctdb_rec *crec = talloc_get_type_abort(
66                 rec->private_data, struct db_ctdb_rec);
67         TDB_DATA data;
68         int ret;
69
70         /*
71          * We have to store the header with empty data. TODO: Fix the
72          * tdb-level cleanup
73          */
74
75         data.dptr = (uint8 *)&crec->header;
76         data.dsize = sizeof(crec->header);
77
78         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
79
80         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
81 }
82
83 static int db_ctdb_record_destr(struct db_record* data)
84 {
85         struct db_ctdb_rec *crec = talloc_get_type_abort(
86                 data->private_data, struct db_ctdb_rec);
87
88         DEBUG(10, (DEBUGLEVEL > 10
89                    ? "Unlocking db %u key %s\n"
90                    : "Unlocking db %u key %.20s\n",
91                    (int)crec->ctdb_ctx->db_id,
92                    hex_encode(data, (unsigned char *)data->key.dptr,
93                               data->key.dsize)));
94
95         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
96                 DEBUG(0, ("tdb_chainunlock failed\n"));
97                 return -1;
98         }
99
100         return 0;
101 }
102
103 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
104                                               TALLOC_CTX *mem_ctx,
105                                               TDB_DATA key)
106 {
107         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
108                                                         struct db_ctdb_ctx);
109         struct db_record *result;
110         struct db_ctdb_rec *crec;
111         NTSTATUS status;
112         TDB_DATA ctdb_data;
113
114         if (!(result = talloc(mem_ctx, struct db_record))) {
115                 DEBUG(0, ("talloc failed\n"));
116                 return NULL;
117         }
118
119         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
120                 DEBUG(0, ("talloc failed\n"));
121                 TALLOC_FREE(result);
122                 return NULL;
123         }
124
125         result->private_data = (void *)crec;
126         crec->ctdb_ctx = ctx;
127
128         result->key.dsize = key.dsize;
129         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
130         if (result->key.dptr == NULL) {
131                 DEBUG(0, ("talloc failed\n"));
132                 TALLOC_FREE(result);
133                 return NULL;
134         }
135
136         /*
137          * Do a blocking lock on the record
138          */
139 again:
140
141         if (DEBUGLEVEL >= 10) {
142                 char *keystr = hex_encode(result, key.dptr, key.dsize);
143                 DEBUG(10, (DEBUGLEVEL > 10
144                            ? "Locking db %u key %s\n"
145                            : "Locking db %u key %.20s\n",
146                            (int)crec->ctdb_ctx->db_id, keystr));
147                 TALLOC_FREE(keystr);
148         }
149         
150         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
151                 DEBUG(3, ("tdb_chainlock failed\n"));
152                 TALLOC_FREE(result);
153                 return NULL;
154         }
155
156         result->store = db_ctdb_store;
157         result->delete_rec = db_ctdb_delete;
158         talloc_set_destructor(result, db_ctdb_record_destr);
159
160         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
161
162         /*
163          * See if we have a valid record and we are the dmaster. If so, we can
164          * take the shortcut and just return it.
165          */
166
167         if ((ctdb_data.dptr == NULL) ||
168             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
169             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
170 #if 0
171             || (random() % 2 != 0)
172 #endif
173 ) {
174                 SAFE_FREE(ctdb_data.dptr);
175                 tdb_chainunlock(ctx->wtdb->tdb, key);
176                 talloc_set_destructor(result, NULL);
177
178                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
179                            ctdb_data.dptr, ctdb_data.dptr ?
180                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
181                            get_my_vnn()));
182
183                 status = ctdbd_migrate(db_ctdbd_conn(ctx), ctx->db_id, key);
184                 if (!NT_STATUS_IS_OK(status)) {
185                         DEBUG(5, ("ctdb_migrate failed: %s\n",
186                                   nt_errstr(status)));
187                         TALLOC_FREE(result);
188                         return NULL;
189                 }
190                 /* now its migrated, try again */
191                 goto again;
192         }
193
194         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
195
196         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
197         result->value.dptr = NULL;
198
199         if ((result->value.dsize != 0)
200             && !(result->value.dptr = (uint8 *)talloc_memdup(
201                          result, ctdb_data.dptr + sizeof(crec->header),
202                          result->value.dsize))) {
203                 DEBUG(0, ("talloc failed\n"));
204                 TALLOC_FREE(result);
205         }
206
207         SAFE_FREE(ctdb_data.dptr);
208
209         return result;
210 }
211
212 /*
213   fetch (unlocked, no migration) operation on ctdb
214  */
215 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
216                          TDB_DATA key, TDB_DATA *data)
217 {
218         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
219                                                         struct db_ctdb_ctx);
220         NTSTATUS status;
221         TDB_DATA ctdb_data;
222
223         /* try a direct fetch */
224         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
225
226         /*
227          * See if we have a valid record and we are the dmaster. If so, we can
228          * take the shortcut and just return it.
229          */
230         if ((ctdb_data.dptr != NULL) &&
231             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
232             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn()) {
233                 /* we are the dmaster - avoid the ctdb protocol op */
234
235                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
236                 if (data->dsize == 0) {
237                         SAFE_FREE(ctdb_data.dptr);
238                         data->dptr = NULL;
239                         return 0;
240                 }
241
242                 data->dptr = (uint8 *)talloc_memdup(
243                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
244                         data->dsize);
245
246                 SAFE_FREE(ctdb_data.dptr);
247
248                 if (data->dptr == NULL) {
249                         return -1;
250                 }
251                 return 0;
252         }
253
254         SAFE_FREE(ctdb_data.dptr);
255
256         /* we weren't able to get it locally - ask ctdb to fetch it for us */
257         status = ctdbd_fetch(db_ctdbd_conn(ctx), ctx->db_id, key, mem_ctx,
258                              data);
259         if (!NT_STATUS_IS_OK(status)) {
260                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
261                 return -1;
262         }
263
264         return 0;
265 }
266
267 struct traverse_state {
268         struct db_context *db;
269         int (*fn)(struct db_record *rec, void *private_data);
270         void *private_data;
271 };
272
273 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
274 {
275         struct traverse_state *state = (struct traverse_state *)private_data;
276         struct db_record *rec;
277         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
278         /* we have to give them a locked record to prevent races */
279         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
280         if (rec && rec->value.dsize > 0) {
281                 state->fn(rec, state->private_data);
282         }
283         talloc_free(tmp_ctx);
284 }
285
286 static int db_ctdb_traverse(struct db_context *db,
287                             int (*fn)(struct db_record *rec,
288                                       void *private_data),
289                             void *private_data)
290 {
291         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
292                                                         struct db_ctdb_ctx);
293         struct traverse_state state;
294
295         state.db = db;
296         state.fn = fn;
297         state.private_data = private_data;
298
299         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
300         return 0;
301 }
302
303 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
304 {
305         return NT_STATUS_MEDIA_WRITE_PROTECTED;
306 }
307
308 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
309 {
310         return NT_STATUS_MEDIA_WRITE_PROTECTED;
311 }
312
313 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
314 {
315         struct traverse_state *state = (struct traverse_state *)private_data;
316         struct db_record rec;
317         rec.key = key;
318         rec.value = data;
319         rec.store = db_ctdb_store_deny;
320         rec.delete_rec = db_ctdb_delete_deny;
321         rec.private_data = state->db;
322         state->fn(&rec, state->private_data);
323 }
324
325 static int db_ctdb_traverse_read(struct db_context *db,
326                                  int (*fn)(struct db_record *rec,
327                                            void *private_data),
328                                  void *private_data)
329 {
330         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
331                                                         struct db_ctdb_ctx);
332         struct traverse_state state;
333
334         state.db = db;
335         state.fn = fn;
336         state.private_data = private_data;
337
338         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
339         return 0;
340 }
341
342 static int db_ctdb_get_seqnum(struct db_context *db)
343 {
344         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
345                                                         struct db_ctdb_ctx);
346         return tdb_get_seqnum(ctx->wtdb->tdb);
347 }
348
349 /*
350  * Get the ctdbd connection for a database. If possible, re-use the messaging
351  * ctdbd connection
352  */
353 static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx)
354 {
355         struct ctdbd_connection *result;
356
357         result = messaging_ctdbd_connection();
358
359         if (result != NULL) {
360
361                 if (ctx->conn == NULL) {
362                         /*
363                          * Someone has initialized messaging since we
364                          * initialized our own connection, we don't need it
365                          * anymore.
366                          */
367                         TALLOC_FREE(ctx->conn);
368                 }
369
370                 return result;
371         }
372
373         if (ctx->conn == NULL) {
374                 NTSTATUS status;
375                 status = ctdbd_init_connection(ctx, &ctx->conn);
376                 if (!NT_STATUS_IS_OK(status)) {
377                         return NULL;
378                 }
379                 set_my_vnn(ctdbd_vnn(ctx->conn));
380         }
381
382         return ctx->conn;
383 }
384
385 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
386                                 const char *name,
387                                 int hash_size, int tdb_flags,
388                                 int open_flags, mode_t mode)
389 {
390         struct db_context *result;
391         struct db_ctdb_ctx *db_ctdb;
392         char *db_path;
393         NTSTATUS status;
394
395         if (!lp_clustering()) {
396                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
397                 return NULL;
398         }
399
400         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
401                 DEBUG(0, ("talloc failed\n"));
402                 TALLOC_FREE(result);
403                 return NULL;
404         }
405
406         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
407                 DEBUG(0, ("talloc failed\n"));
408                 TALLOC_FREE(result);
409                 return NULL;
410         }
411
412         db_ctdb->conn = NULL;
413
414         status = ctdbd_db_attach(db_ctdbd_conn(db_ctdb), name,
415                                  &db_ctdb->db_id, tdb_flags);
416
417         if (!NT_STATUS_IS_OK(status)) {
418                 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
419                           nt_errstr(status)));
420                 TALLOC_FREE(result);
421                 return NULL;
422         }
423
424         db_path = ctdbd_dbpath(db_ctdbd_conn(db_ctdb), db_ctdb,
425                                db_ctdb->db_id);
426
427         /* only pass through specific flags */
428         tdb_flags &= TDB_SEQNUM;
429
430         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
431         if (db_ctdb->wtdb == NULL) {
432                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
433                 TALLOC_FREE(result);
434                 return NULL;
435         }
436         talloc_free(db_path);
437
438         result->private_data = (void *)db_ctdb;
439         result->fetch_locked = db_ctdb_fetch_locked;
440         result->fetch = db_ctdb_fetch;
441         result->traverse = db_ctdb_traverse;
442         result->traverse_read = db_ctdb_traverse_read;
443         result->get_seqnum = db_ctdb_get_seqnum;
444
445         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
446                  name, db_ctdb->db_id));
447
448         return result;
449 }
450
451 #else
452
453 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
454                                 const char *name,
455                                 int hash_size, int tdb_flags,
456                                 int open_flags, mode_t mode)
457 {
458         DEBUG(0, ("no clustering compiled in\n"));
459         return NULL;
460 }
461
462 #endif