r24113: some little fixes to get the correct error message
[ira/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21
22 #ifdef CLUSTER_SUPPORT
23
24 #include "ctdb.h"
25 #include "ctdb_private.h"
26
27 struct db_ctdb_ctx {
28         struct tdb_wrap *wtdb;
29         uint32 db_id;
30         struct ctdbd_connection *conn;
31 };
32
33 struct db_ctdb_rec {
34         struct db_ctdb_ctx *ctdb_ctx;
35         struct ctdb_ltdb_header header;
36 };
37
38 static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx);
39
40 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
41 {
42         struct db_ctdb_rec *crec = talloc_get_type_abort(
43                 rec->private_data, struct db_ctdb_rec);
44         TDB_DATA cdata;
45         int ret;
46
47         cdata.dsize = sizeof(crec->header) + data.dsize;
48
49         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
50                 return NT_STATUS_NO_MEMORY;
51         }
52
53         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
54         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
55
56         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
57
58         SAFE_FREE(cdata.dptr);
59
60         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
61 }
62
63 static NTSTATUS db_ctdb_delete(struct db_record *rec)
64 {
65         struct db_ctdb_rec *crec = talloc_get_type_abort(
66                 rec->private_data, struct db_ctdb_rec);
67         TDB_DATA data;
68         int ret;
69
70         /*
71          * We have to store the header with empty data. TODO: Fix the
72          * tdb-level cleanup
73          */
74
75         data.dptr = (uint8 *)&crec->header;
76         data.dsize = sizeof(crec->header);
77
78         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
79
80         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
81 }
82
83 static int db_ctdb_record_destr(struct db_record* data)
84 {
85         struct db_ctdb_rec *crec = talloc_get_type_abort(
86                 data->private_data, struct db_ctdb_rec);
87
88         DEBUG(10, ("Unlocking key %s\n",
89                    hex_encode(data, (unsigned char *)data->key.dptr,
90                               data->key.dsize)));
91
92         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
93                 DEBUG(0, ("tdb_chainunlock failed\n"));
94                 return -1;
95         }
96
97         return 0;
98 }
99
100 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
101                                               TALLOC_CTX *mem_ctx,
102                                               TDB_DATA key)
103 {
104         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
105                                                         struct db_ctdb_ctx);
106         struct db_record *result;
107         struct db_ctdb_rec *crec;
108         NTSTATUS status;
109         TDB_DATA ctdb_data;
110
111         if (!(result = talloc(mem_ctx, struct db_record))) {
112                 DEBUG(0, ("talloc failed\n"));
113                 return NULL;
114         }
115
116         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
117                 DEBUG(0, ("talloc failed\n"));
118                 TALLOC_FREE(result);
119                 return NULL;
120         }
121
122         result->private_data = (void *)crec;
123         crec->ctdb_ctx = ctx;
124
125         result->key.dsize = key.dsize;
126         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
127         if (result->key.dptr == NULL) {
128                 DEBUG(0, ("talloc failed\n"));
129                 TALLOC_FREE(result);
130                 return NULL;
131         }
132
133         /*
134          * Do a blocking lock on the record
135          */
136 again:
137
138         DEBUG(10, ("Locking key %s\n",
139                    hex_encode(result, (unsigned char *)key.dptr,
140                               key.dsize)));
141         
142         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
143                 DEBUG(3, ("tdb_chainlock failed\n"));
144                 TALLOC_FREE(result);
145                 return NULL;
146         }
147
148         result->store = db_ctdb_store;
149         result->delete_rec = db_ctdb_delete;
150         talloc_set_destructor(result, db_ctdb_record_destr);
151
152         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
153
154         /*
155          * See if we have a valid record and we are the dmaster. If so, we can
156          * take the shortcut and just return it.
157          */
158
159         if ((ctdb_data.dptr == NULL) ||
160             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
161             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
162 #if 0
163             || (random() % 2 != 0)
164 #endif
165 ) {
166                 SAFE_FREE(ctdb_data.dptr);
167                 tdb_chainunlock(ctx->wtdb->tdb, key);
168                 talloc_set_destructor(result, NULL);
169
170                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
171                            ctdb_data.dptr, ctdb_data.dptr ?
172                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
173                            get_my_vnn()));
174
175                 status = ctdbd_migrate(db_ctdbd_conn(ctx), ctx->db_id, key);
176                 if (!NT_STATUS_IS_OK(status)) {
177                         DEBUG(5, ("ctdb_migrate failed: %s\n",
178                                   nt_errstr(status)));
179                         TALLOC_FREE(result);
180                         return NULL;
181                 }
182                 /* now its migrated, try again */
183                 goto again;
184         }
185
186         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
187
188         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
189         result->value.dptr = NULL;
190
191         if ((result->value.dsize != 0)
192             && !(result->value.dptr = (uint8 *)talloc_memdup(
193                          result, ctdb_data.dptr + sizeof(crec->header),
194                          result->value.dsize))) {
195                 DEBUG(0, ("talloc failed\n"));
196                 TALLOC_FREE(result);
197         }
198
199         SAFE_FREE(ctdb_data.dptr);
200
201         return result;
202 }
203
204 /*
205   fetch (unlocked, no migration) operation on ctdb
206  */
207 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
208                          TDB_DATA key, TDB_DATA *data)
209 {
210         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
211                                                         struct db_ctdb_ctx);
212         NTSTATUS status;
213         TDB_DATA ctdb_data;
214
215         /* try a direct fetch */
216         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
217
218         /*
219          * See if we have a valid record and we are the dmaster. If so, we can
220          * take the shortcut and just return it.
221          */
222         if ((ctdb_data.dptr != NULL) &&
223             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
224             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn()) {
225                 /* we are the dmaster - avoid the ctdb protocol op */
226
227                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
228                 if (data->dsize == 0) {
229                         SAFE_FREE(ctdb_data.dptr);
230                         data->dptr = NULL;
231                         return 0;
232                 }
233
234                 data->dptr = (uint8 *)talloc_memdup(
235                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
236                         data->dsize);
237
238                 SAFE_FREE(ctdb_data.dptr);
239
240                 if (data->dptr == NULL) {
241                         return -1;
242                 }
243                 return 0;
244         }
245
246         SAFE_FREE(ctdb_data.dptr);
247
248         /* we weren't able to get it locally - ask ctdb to fetch it for us */
249         status = ctdbd_fetch(db_ctdbd_conn(ctx), ctx->db_id, key, mem_ctx,
250                              data);
251         if (!NT_STATUS_IS_OK(status)) {
252                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
253                 return -1;
254         }
255
256         return 0;
257 }
258
259 struct traverse_state {
260         struct db_context *db;
261         int (*fn)(struct db_record *rec, void *private_data);
262         void *private_data;
263 };
264
265 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
266 {
267         struct traverse_state *state = (struct traverse_state *)private_data;
268         struct db_record *rec;
269         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
270         /* we have to give them a locked record to prevent races */
271         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
272         if (rec && rec->value.dsize > 0) {
273                 state->fn(rec, state->private_data);
274         }
275         talloc_free(tmp_ctx);
276 }
277
278 static int db_ctdb_traverse(struct db_context *db,
279                             int (*fn)(struct db_record *rec,
280                                       void *private_data),
281                             void *private_data)
282 {
283         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
284                                                         struct db_ctdb_ctx);
285         struct traverse_state state;
286
287         state.db = db;
288         state.fn = fn;
289         state.private_data = private_data;
290
291         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
292         return 0;
293 }
294
295 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
296 {
297         return NT_STATUS_MEDIA_WRITE_PROTECTED;
298 }
299
300 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
301 {
302         return NT_STATUS_MEDIA_WRITE_PROTECTED;
303 }
304
305 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
306 {
307         struct traverse_state *state = (struct traverse_state *)private_data;
308         struct db_record rec;
309         rec.key = key;
310         rec.value = data;
311         rec.store = db_ctdb_store_deny;
312         rec.delete_rec = db_ctdb_delete_deny;
313         rec.private_data = state->db;
314         state->fn(&rec, state->private_data);
315 }
316
317 static int db_ctdb_traverse_read(struct db_context *db,
318                                  int (*fn)(struct db_record *rec,
319                                            void *private_data),
320                                  void *private_data)
321 {
322         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
323                                                         struct db_ctdb_ctx);
324         struct traverse_state state;
325
326         state.db = db;
327         state.fn = fn;
328         state.private_data = private_data;
329
330         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
331         return 0;
332 }
333
334 static int db_ctdb_get_seqnum(struct db_context *db)
335 {
336         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
337                                                         struct db_ctdb_ctx);
338         return tdb_get_seqnum(ctx->wtdb->tdb);
339 }
340
341 /*
342  * Get the ctdbd connection for a database. If possible, re-use the messaging
343  * ctdbd connection
344  */
345 static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx)
346 {
347         struct ctdbd_connection *result;
348
349         result = messaging_ctdbd_connection();
350
351         if (result != NULL) {
352
353                 if (ctx->conn == NULL) {
354                         /*
355                          * Someone has initialized messaging since we
356                          * initialized our own connection, we don't need it
357                          * anymore.
358                          */
359                         TALLOC_FREE(ctx->conn);
360                 }
361
362                 return result;
363         }
364
365         if (ctx->conn == NULL) {
366                 NTSTATUS status;
367                 status = ctdbd_init_connection(ctx, &ctx->conn);
368                 if (!NT_STATUS_IS_OK(status)) {
369                         return NULL;
370                 }
371                 set_my_vnn(ctdbd_vnn(ctx->conn));
372         }
373
374         return ctx->conn;
375 }
376
377 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
378                                 const char *name,
379                                 int hash_size, int tdb_flags,
380                                 int open_flags, mode_t mode)
381 {
382         struct db_context *result;
383         struct db_ctdb_ctx *db_ctdb;
384         char *db_path;
385         NTSTATUS status;
386
387         if (!lp_clustering()) {
388                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
389                 return NULL;
390         }
391
392         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
393                 DEBUG(0, ("talloc failed\n"));
394                 TALLOC_FREE(result);
395                 return NULL;
396         }
397
398         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
399                 DEBUG(0, ("talloc failed\n"));
400                 TALLOC_FREE(result);
401                 return NULL;
402         }
403
404         db_ctdb->conn = NULL;
405
406         status = ctdbd_db_attach(db_ctdbd_conn(db_ctdb), name,
407                                  &db_ctdb->db_id, tdb_flags);
408
409         if (!NT_STATUS_IS_OK(status)) {
410                 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
411                           nt_errstr(status)));
412                 TALLOC_FREE(result);
413                 return NULL;
414         }
415
416         db_path = ctdbd_dbpath(db_ctdbd_conn(db_ctdb), db_ctdb,
417                                db_ctdb->db_id);
418
419         /* only pass through specific flags */
420         tdb_flags &= TDB_SEQNUM;
421
422         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
423         if (db_ctdb->wtdb == NULL) {
424                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
425                 TALLOC_FREE(result);
426                 return NULL;
427         }
428         talloc_free(db_path);
429
430         result->private_data = (void *)db_ctdb;
431         result->fetch_locked = db_ctdb_fetch_locked;
432         result->fetch = db_ctdb_fetch;
433         result->traverse = db_ctdb_traverse;
434         result->traverse_read = db_ctdb_traverse_read;
435         result->get_seqnum = db_ctdb_get_seqnum;
436
437         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
438                  name, db_ctdb->db_id));
439
440         return result;
441 }
442
443 #else
444
445 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
446                                 const char *name,
447                                 int hash_size, int tdb_flags,
448                                 int open_flags, mode_t mode)
449 {
450         DEBUG(0, ("no clustering compiled in\n"));
451         return NULL;
452 }
453
454 #endif