dsdb: lock metadata.tdb during lock_read in partitions module
[bbaumbach/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / partition_metadata.c
1 /*
2    Partitions ldb module - management of metadata.tdb for sequence number
3
4    Copyright (C) Amitay Isaacs <amitay@samba.org> 2011
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "dsdb/samdb/ldb_modules/partition.h"
21 #include "lib/ldb-samba/ldb_wrap.h"
22 #include "system/filesys.h"
23
24 #define LDB_METADATA_SEQ_NUM    "SEQ_NUM"
25
26
27 /*
28  * Read a key with uint64 value
29  */
30 static int partition_metadata_get_uint64(struct ldb_module *module,
31                                          const char *key, uint64_t *value,
32                                          uint64_t default_value)
33 {
34         struct partition_private_data *data;
35         struct tdb_context *tdb;
36         TDB_DATA tdb_key, tdb_data;
37         char *value_str;
38         TALLOC_CTX *tmp_ctx;
39         int error = 0;
40
41         data = talloc_get_type_abort(ldb_module_get_private(module),
42                                      struct partition_private_data);
43
44         if (!data || !data->metadata || !data->metadata->db) {
45                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
46                                         "partition_metadata: metadata tdb not initialized");
47         }
48
49         tmp_ctx = talloc_new(NULL);
50         if (tmp_ctx == NULL) {
51                 return ldb_module_oom(module);
52         }
53
54         tdb = data->metadata->db->tdb;
55
56         tdb_key.dptr = (uint8_t *)discard_const_p(char, key);
57         tdb_key.dsize = strlen(key);
58
59         tdb_data = tdb_fetch(tdb, tdb_key);
60         if (!tdb_data.dptr) {
61                 if (tdb_error(tdb) == TDB_ERR_NOEXIST) {
62                         *value = default_value;
63                         return LDB_SUCCESS;
64                 } else {
65                         return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
66                                                 tdb_errorstr(tdb));
67                 }
68         }
69
70         value_str = talloc_strndup(tmp_ctx, (char *)tdb_data.dptr, tdb_data.dsize);
71         if (value_str == NULL) {
72                 SAFE_FREE(tdb_data.dptr);
73                 talloc_free(tmp_ctx);
74                 return ldb_module_oom(module);
75         }
76
77         *value = strtoull_err(value_str, NULL, 10, &error);
78         if (error != 0) {
79                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
80                                         "partition_metadata: converision failed");
81         }
82
83         SAFE_FREE(tdb_data.dptr);
84         talloc_free(tmp_ctx);
85
86         return LDB_SUCCESS;
87 }
88
89
90 /*
91  * Write a key with uin64 value
92  */
93 static int partition_metadata_set_uint64(struct ldb_module *module,
94                                          const char *key, uint64_t value,
95                                          bool insert)
96 {
97         struct partition_private_data *data;
98         struct tdb_context *tdb;
99         TDB_DATA tdb_key, tdb_data;
100         int tdb_flag;
101         char *value_str;
102         TALLOC_CTX *tmp_ctx;
103
104         data = talloc_get_type_abort(ldb_module_get_private(module),
105                                      struct partition_private_data);
106
107         if (!data || !data->metadata || !data->metadata->db) {
108                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
109                                         "partition_metadata: metadata tdb not initialized");
110         }
111
112         tmp_ctx = talloc_new(NULL);
113         if (tmp_ctx == NULL) {
114                 return ldb_module_oom(module);
115         }
116
117         tdb = data->metadata->db->tdb;
118
119         value_str = talloc_asprintf(tmp_ctx, "%llu", (unsigned long long)value);
120         if (value_str == NULL) {
121                 talloc_free(tmp_ctx);
122                 return ldb_module_oom(module);
123         }
124
125         tdb_key.dptr = (uint8_t *)discard_const_p(char, key);
126         tdb_key.dsize = strlen(key);
127
128         tdb_data.dptr = (uint8_t *)value_str;
129         tdb_data.dsize = strlen(value_str);
130
131         if (insert) {
132                 tdb_flag = TDB_INSERT;
133         } else {
134                 tdb_flag = TDB_MODIFY;
135         }
136
137         if (tdb_store(tdb, tdb_key, tdb_data, tdb_flag) != 0) {
138                 int ret;
139                 char *error_string = talloc_asprintf(tmp_ctx, "%s: tdb_store of key %s failed: %s",
140                                                      tdb_name(tdb), key, tdb_errorstr(tdb));
141                 ret = ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
142                                        error_string);
143                 talloc_free(tmp_ctx);
144                 return ret;
145         }
146
147         talloc_free(tmp_ctx);
148
149         return LDB_SUCCESS;
150 }
151
152 int partition_metadata_inc_schema_sequence(struct ldb_module *module)
153 {
154         struct partition_private_data *data;
155         int ret;
156         uint64_t value = 0;
157
158         data = talloc_get_type_abort(ldb_module_get_private(module),
159                                     struct partition_private_data);
160         if (!data || !data->metadata) {
161                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
162                                         "partition_metadata: metadata not initialized");
163         }
164
165         if (data->metadata->in_transaction == 0) {
166                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
167                                         "partition_metadata: increment sequence number without transaction");
168         }
169         ret = partition_metadata_get_uint64(module, DSDB_METADATA_SCHEMA_SEQ_NUM, &value, 0);
170         if (ret != LDB_SUCCESS) {
171                 return ret;
172         }
173
174         value++;
175         ret = partition_metadata_set_uint64(module, DSDB_METADATA_SCHEMA_SEQ_NUM, value, false);
176         if (ret == LDB_ERR_OPERATIONS_ERROR) {
177                 /* Modify failed, let's try the add */
178                 ret = partition_metadata_set_uint64(module, DSDB_METADATA_SCHEMA_SEQ_NUM, value, true);
179         }
180         return ret;
181 }
182
183
184
185 /*
186  * Open sam.ldb.d/metadata.tdb.
187  */
188 static int partition_metadata_open(struct ldb_module *module, bool create)
189 {
190         struct ldb_context *ldb = ldb_module_get_ctx(module);
191         TALLOC_CTX *tmp_ctx;
192         struct partition_private_data *data;
193         struct loadparm_context *lp_ctx;
194         char *filename, *dirname;
195         int open_flags, tdb_flags, ldb_flags;
196         struct stat statbuf;
197
198         data = talloc_get_type_abort(ldb_module_get_private(module),
199                                      struct partition_private_data);
200         if (!data || !data->metadata) {
201                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
202                                         "partition_metadata: metadata not initialized");
203         }
204
205         tmp_ctx = talloc_new(NULL);
206         if (tmp_ctx == NULL) {
207                 return ldb_module_oom(module);
208         }
209
210         filename = ldb_relative_path(ldb,
211                                      tmp_ctx,
212                                      "sam.ldb.d/metadata.tdb");
213
214         if (!filename) {
215                 talloc_free(tmp_ctx);
216                 return ldb_oom(ldb);
217         }
218
219         open_flags = O_RDWR;
220         if (create) {
221                 open_flags |= O_CREAT;
222
223                 /* While provisioning, sam.ldb.d directory may not exist,
224                  * so create it. Ignore errors, if it already exists. */
225                 dirname = ldb_relative_path(ldb,
226                                             tmp_ctx,
227                                             "sam.ldb.d");
228                 if (!dirname) {
229                         talloc_free(tmp_ctx);
230                         return ldb_oom(ldb);
231                 }
232
233                 mkdir(dirname, 0700);
234                 talloc_free(dirname);
235         } else {
236                 if (stat(filename, &statbuf) != 0) {
237                         talloc_free(tmp_ctx);
238                         return LDB_ERR_OPERATIONS_ERROR;
239                 }
240         }
241
242         lp_ctx = talloc_get_type_abort(ldb_get_opaque(ldb, "loadparm"),
243                                        struct loadparm_context);
244
245         tdb_flags = lpcfg_tdb_flags(lp_ctx, TDB_DEFAULT|TDB_SEQNUM);
246
247         ldb_flags = ldb_module_flags(ldb);
248
249         if (ldb_flags & LDB_FLG_NOSYNC) {
250                 tdb_flags |= TDB_NOSYNC;
251         }
252
253         data->metadata->db = tdb_wrap_open(
254                 data->metadata, filename, 10,
255                 tdb_flags, open_flags, 0660);
256         if (data->metadata->db == NULL) {
257                 talloc_free(tmp_ctx);
258                 if (create) {
259                         ldb_asprintf_errstring(ldb, "partition_metadata: Unable to create %s: %s",
260                                                filename, strerror(errno));
261                 } else {
262                         ldb_asprintf_errstring(ldb, "partition_metadata: Unable to open %s: %s",
263                                                filename, strerror(errno));
264                 }
265                 if (errno == EACCES || errno == EPERM) {
266                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
267                 }
268                 return LDB_ERR_OPERATIONS_ERROR;
269         }
270
271         talloc_free(tmp_ctx);
272         return LDB_SUCCESS;
273 }
274
275
276 /*
277  * Set the sequence number calculated from older logic (sum of primary sequence
278  * numbers for each partition) as LDB_METADATA_SEQ_NUM key.
279  */
280 static int partition_metadata_set_sequence_number(struct ldb_module *module)
281 {
282         int ret;
283         uint64_t seq_number;
284
285         ret = partition_sequence_number_from_partitions(module, &seq_number);
286         if (ret != LDB_SUCCESS) {
287                 return ret;
288         }
289
290         return partition_metadata_set_uint64(module, LDB_METADATA_SEQ_NUM, seq_number, true);
291 }
292
293
294 /*
295  * Initialize metadata. Load metadata.tdb.
296  * If missing, create it and fill in sequence number
297  */
298 int partition_metadata_init(struct ldb_module *module)
299 {
300         struct partition_private_data *data;
301         int ret;
302
303         data = talloc_get_type_abort(ldb_module_get_private(module),
304                                      struct partition_private_data);
305
306         if (data->metadata != NULL && data->metadata->db != NULL) {
307                 return LDB_SUCCESS;
308         }
309
310         data->metadata = talloc_zero(data, struct partition_metadata);
311         if (data->metadata == NULL) {
312                 return ldb_module_oom(module);
313         }
314
315         ret = partition_metadata_open(module, false);
316         if (ret == LDB_SUCCESS) {
317                 /* Great, we got the DB open */
318                 return LDB_SUCCESS;
319         }
320
321         /* metadata.tdb does not exist, create it */
322         DEBUG(2, ("partition_metadata: Migrating partition metadata: "
323                   "open of metadata.tdb gave: %s\n",
324                   ldb_errstring(ldb_module_get_ctx(module))));
325         ret = partition_metadata_open(module, true);
326         if (ret != LDB_SUCCESS) {
327                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
328                                        "partition_metadata: "
329                                        "Migrating partition metadata: "
330                                        "create of metadata.tdb gave: %s\n",
331                                        ldb_errstring(ldb_module_get_ctx(module)));
332                 TALLOC_FREE(data->metadata);
333                 return ret;
334         }
335
336         return ret;
337 }
338
339
340 /*
341  * Read the sequence number, default to 0 if LDB_METADATA_SEQ_NUM key is missing
342  */
343 int partition_metadata_sequence_number(struct ldb_module *module, uint64_t *value)
344 {
345
346         /* We have to lock all the databases as otherwise we can
347          * return a sequence number that is higher than the DB values
348          * that we can see, as those transactions close after the
349          * metadata.tdb transaction closes */
350         int ret = partition_read_lock(module);
351         if (ret != LDB_SUCCESS) {
352                 return ret;
353         }
354
355         /*
356          * This means we will give a 0 until the first write
357          * tranaction, which is actually pretty reasonable.
358          *
359          * All modern databases will have the metadata.tdb from
360          * the time of the first transaction in provision anyway.
361          */
362         ret = partition_metadata_get_uint64(module,
363                                             LDB_METADATA_SEQ_NUM,
364                                             value,
365                                             0);
366         if (ret == LDB_SUCCESS) {
367                 ret = partition_read_unlock(module);
368         } else {
369                 /* Don't overwrite the error code */
370                 partition_read_unlock(module);
371         }
372         return ret;
373
374 }
375
376
377 /*
378  * Increment the sequence number, returning the new sequence number
379  */
380 int partition_metadata_sequence_number_increment(struct ldb_module *module, uint64_t *value)
381 {
382         struct partition_private_data *data;
383         int ret;
384
385         data = talloc_get_type_abort(ldb_module_get_private(module),
386                                     struct partition_private_data);
387         if (!data || !data->metadata) {
388                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
389                                         "partition_metadata: metadata not initialized");
390         }
391
392         if (data->metadata->in_transaction == 0) {
393                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
394                                         "partition_metadata: increment sequence number without transaction");
395         }
396
397         ret = partition_metadata_get_uint64(module, LDB_METADATA_SEQ_NUM, value, 0);
398         if (ret != LDB_SUCCESS) {
399                 return ret;
400         }
401
402         if (*value == 0) {
403                 /*
404                  * We are in a transaction now, so we can get the
405                  * sequence number from the partitions.
406                  */
407                 ret = partition_metadata_set_sequence_number(module);
408                 if (ret != LDB_SUCCESS) {
409                         return ret;
410                 }
411
412                 ret = partition_metadata_get_uint64(module,
413                                                     LDB_METADATA_SEQ_NUM,
414                                                     value, 0);
415                 if (ret != LDB_SUCCESS) {
416                         return ret;
417                 }
418         }
419
420         (*value)++;
421         ret = partition_metadata_set_uint64(module, LDB_METADATA_SEQ_NUM, *value, false);
422         return ret;
423 }
424 /*
425   lock the database for read - use by partition_lock_read
426 */
427 int partition_metadata_read_lock(struct ldb_module *module)
428 {
429         struct partition_private_data *data
430                 = talloc_get_type_abort(ldb_module_get_private(module),
431                                         struct partition_private_data);
432         struct tdb_context *tdb = NULL;
433         int tdb_ret = 0;
434         int ret;
435
436         if (!data || !data->metadata || !data->metadata->db) {
437                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
438                                         "partition_metadata: metadata not initialized");
439         }
440         tdb = data->metadata->db->tdb;
441
442         if (tdb_transaction_active(tdb) == false &&
443             data->metadata->read_lock_count == 0) {
444                 tdb_ret = tdb_lockall_read(tdb);
445         }
446         if (tdb_ret == 0) {
447                 data->metadata->read_lock_count++;
448                 return LDB_SUCCESS;
449         } else {
450                 /* Sadly we can't call ltdb_err_map(tdb_error(tdb)); */
451                 ret = LDB_ERR_BUSY;
452         }
453         ldb_debug_set(ldb_module_get_ctx(module),
454                       LDB_DEBUG_FATAL,
455                       "Failure during partition_metadata_read_lock(): %s",
456                       tdb_errorstr(tdb));
457         return ret;
458 }
459
460 /*
461   unlock the database after a partition_metadata_lock_read()
462 */
463 int partition_metadata_read_unlock(struct ldb_module *module)
464 {
465         struct partition_private_data *data
466                 = talloc_get_type_abort(ldb_module_get_private(module),
467                                         struct partition_private_data);
468         struct tdb_context *tdb = NULL;
469
470         data = talloc_get_type_abort(ldb_module_get_private(module),
471                                      struct partition_private_data);
472         if (!data || !data->metadata || !data->metadata->db) {
473                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
474                                         "partition_metadata: metadata not initialized");
475         }
476         tdb = data->metadata->db->tdb;
477
478         if (!tdb_transaction_active(tdb) &&
479             data->metadata->read_lock_count == 1) {
480                 tdb_unlockall_read(tdb);
481                 data->metadata->read_lock_count--;
482                 return 0;
483         }
484         data->metadata->read_lock_count--;
485         return 0;
486 }
487
488
489 /*
490  * Transaction start
491  */
492 int partition_metadata_start_trans(struct ldb_module *module)
493 {
494         struct partition_private_data *data;
495         struct tdb_context *tdb;
496
497         data = talloc_get_type_abort(ldb_module_get_private(module),
498                                      struct partition_private_data);
499         if (!data || !data->metadata || !data->metadata->db) {
500                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
501                                         "partition_metadata: metadata not initialized");
502         }
503         tdb = data->metadata->db->tdb;
504
505         if (tdb_transaction_start(tdb) != 0) {
506                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
507                                         tdb_errorstr(tdb));
508         }
509
510         data->metadata->in_transaction++;
511
512         return LDB_SUCCESS;
513 }
514
515
516 /*
517  * Transaction prepare commit
518  */
519 int partition_metadata_prepare_commit(struct ldb_module *module)
520 {
521         struct partition_private_data *data;
522         struct tdb_context *tdb;
523
524         data = talloc_get_type_abort(ldb_module_get_private(module),
525                                      struct partition_private_data);
526         if (!data || !data->metadata || !data->metadata->db) {
527                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
528                                         "partition_metadata: metadata not initialized");
529         }
530         tdb = data->metadata->db->tdb;
531
532         if (data->metadata->in_transaction == 0) {
533                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
534                                         "partition_metadata: not in transaction");
535         }
536
537         if (tdb_transaction_prepare_commit(tdb) != 0) {
538                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
539                                         tdb_errorstr(tdb));
540         }
541
542         return LDB_SUCCESS;
543 }
544
545
546 /*
547  * Transaction end
548  */
549 int partition_metadata_end_trans(struct ldb_module *module)
550 {
551         struct partition_private_data *data;
552         struct tdb_context *tdb;
553
554         data = talloc_get_type_abort(ldb_module_get_private(module),
555                                      struct partition_private_data);
556         if (!data || !data->metadata || !data->metadata->db) {
557                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
558                                         "partition_metadata: metadata not initialized");
559         }
560         tdb = data->metadata->db->tdb;
561
562         if (data->metadata->in_transaction == 0) {
563                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
564                                         "partition_metadata: not in transaction");
565         }
566
567         data->metadata->in_transaction--;
568
569         if (tdb_transaction_commit(tdb) != 0) {
570                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
571                                         tdb_errorstr(tdb));
572         }
573
574         return LDB_SUCCESS;
575 }
576
577
578 /*
579  * Transaction delete
580  */
581 int partition_metadata_del_trans(struct ldb_module *module)
582 {
583         struct partition_private_data *data;
584         struct tdb_context *tdb;
585
586         data = talloc_get_type_abort(ldb_module_get_private(module),
587                                      struct partition_private_data);
588         if (!data || !data->metadata || !data->metadata->db) {
589                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
590                                         "partition_metadata: metadata not initialized");
591         }
592         tdb = data->metadata->db->tdb;
593
594         if (data->metadata->in_transaction == 0) {
595                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
596                                         "partition_metadata: not in transaction");
597         }
598
599         data->metadata->in_transaction--;
600
601         tdb_transaction_cancel(tdb);
602
603         return LDB_SUCCESS;
604 }