ctdb-daemon: Refactor calculation of tdb open flags based on database type
[sfrench/samba-autobuild/.git] / ctdb / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5    Copyright (C) Ronnie sahlberg  2011
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24
25 #include <tdb.h>
26
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
30
31 #include "ctdb_private.h"
32
33 #include "common/common.h"
34 #include "common/logging.h"
35
36
37 /*
38  * Calculate tdb flags based on databse type
39  */
40 int ctdb_db_tdb_flags(uint8_t db_flags, bool with_valgrind, bool with_mutex)
41 {
42         int tdb_flags = 0;
43
44         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
45                 tdb_flags = TDB_DEFAULT;
46         } else {
47                 tdb_flags = TDB_NOSYNC |
48                             TDB_CLEAR_IF_FIRST |
49                             TDB_INCOMPATIBLE_HASH;
50
51 #ifdef TDB_MUTEX_LOCKING
52                 if (with_mutex && tdb_runtime_check_for_robust_mutexes()) {
53                         tdb_flags |= TDB_MUTEX_LOCKING;
54                 }
55 #endif
56
57         }
58
59         tdb_flags |= TDB_DISALLOW_NESTING;
60         if (with_valgrind) {
61                 tdb_flags |= TDB_NOMMAP;
62         }
63
64         return tdb_flags;
65 }
66
67 /*
68   find an attached ctdb_db handle given a name
69  */
70 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
71 {
72         struct ctdb_db_context *tmp_db;
73         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
74                 if (strcmp(name, tmp_db->db_name) == 0) {
75                         return tmp_db;
76                 }
77         }
78         return NULL;
79 }
80
81
82 /*
83   return the lmaster given a key
84 */
85 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
86 {
87         uint32_t idx, lmaster;
88
89         idx = ctdb_hash(key) % ctdb->vnn_map->size;
90         lmaster = ctdb->vnn_map->map[idx];
91
92         return lmaster;
93 }
94
95
96 /*
97   construct an initial header for a record with no ltdb header yet
98 */
99 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
100                                 TDB_DATA key,
101                                 struct ctdb_ltdb_header *header)
102 {
103         ZERO_STRUCTP(header);
104         /* initial dmaster is the lmaster */
105         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
106         header->flags = CTDB_REC_FLAG_AUTOMATIC;
107 }
108
109
110 /*
111   fetch a record from the ltdb, separating out the header information
112   and returning the body of the record. A valid (initial) header is
113   returned if the record is not present
114 */
115 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
116                     TDB_DATA key, struct ctdb_ltdb_header *header, 
117                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
118 {
119         TDB_DATA rec;
120         struct ctdb_context *ctdb = ctdb_db->ctdb;
121
122         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
123         if (rec.dsize < sizeof(*header)) {
124                 /* return an initial header */
125                 if (rec.dptr) free(rec.dptr);
126                 if (ctdb->vnn_map == NULL) {
127                         /* called from the client */
128                         ZERO_STRUCTP(data);
129                         header->dmaster = (uint32_t)-1;
130                         return -1;
131                 }
132                 ltdb_initial_header(ctdb_db, key, header);
133                 if (data) {
134                         *data = tdb_null;
135                 }
136                 if (ctdb_db->persistent || header->dmaster == ctdb_db->ctdb->pnn) {
137                         if (ctdb_ltdb_store(ctdb_db, key, header, tdb_null) != 0) {
138                                 DEBUG(DEBUG_NOTICE,
139                                       (__location__ "failed to store initial header\n"));
140                         }
141                 }
142                 return 0;
143         }
144
145         *header = *(struct ctdb_ltdb_header *)rec.dptr;
146
147         if (data) {
148                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
149                 data->dptr = talloc_memdup(mem_ctx, 
150                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
151                                            data->dsize);
152         }
153
154         free(rec.dptr);
155         if (data) {
156                 CTDB_NO_MEMORY(ctdb, data->dptr);
157         }
158
159         return 0;
160 }
161
162 /*
163   fetch a record from the ltdb, separating out the header information
164   and returning the body of the record.
165   if the record does not exist, *header will be NULL
166   and data = {0, NULL}
167 */
168 int ctdb_ltdb_fetch_with_header(struct ctdb_db_context *ctdb_db, 
169                     TDB_DATA key, struct ctdb_ltdb_header *header, 
170                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
171 {
172         TDB_DATA rec;
173
174         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
175         if (rec.dsize < sizeof(*header)) {
176                 free(rec.dptr);
177
178                 data->dsize = 0;
179                 data->dptr = NULL;
180                 return -1;
181         }
182
183         *header = *(struct ctdb_ltdb_header *)rec.dptr;
184         if (data) {
185                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
186                 data->dptr = talloc_memdup(mem_ctx, 
187                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
188                                            data->dsize);
189         }
190
191         free(rec.dptr);
192
193         return 0;
194 }
195
196
197 /*
198   write a record to a normal database
199 */
200 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
201                     struct ctdb_ltdb_header *header, TDB_DATA data)
202 {
203         struct ctdb_context *ctdb = ctdb_db->ctdb;
204         TDB_DATA rec[2];
205         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
206         int ret;
207         bool seqnum_suppressed = false;
208
209         if (ctdb_db->ctdb_ltdb_store_fn) {
210                 return ctdb_db->ctdb_ltdb_store_fn(ctdb_db, key, header, data);
211         }
212
213         if (ctdb->flags & CTDB_FLAG_TORTURE) {
214                 TDB_DATA old;
215                 struct ctdb_ltdb_header *h2;
216
217                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
218                 h2 = (struct ctdb_ltdb_header *)old.dptr;
219                 if (old.dptr != NULL && old.dsize >= hsize &&
220                     h2->rsn > header->rsn) {
221                         DEBUG(DEBUG_ERR,
222                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
223                                h2->rsn, header->rsn));
224                 }
225                 if (old.dptr != NULL) {
226                         free(old.dptr);
227                 }
228         }
229
230         rec[0].dsize = hsize;
231         rec[0].dptr = (uint8_t *)header;
232
233         rec[1].dsize = data.dsize;
234         rec[1].dptr = data.dptr;
235
236         /* Databases with seqnum updates enabled only get their seqnum
237            changes when/if we modify the data */
238         if (ctdb_db->seqnum_update != NULL) {
239                 TDB_DATA old;
240                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
241
242                 if ((old.dsize == hsize + data.dsize) &&
243                     memcmp(old.dptr+hsize, data.dptr, data.dsize) == 0) {
244                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
245                         seqnum_suppressed = true;
246                 }
247                 if (old.dptr != NULL) {
248                         free(old.dptr);
249                 }
250         }
251         ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
252         if (ret != 0) {
253                 DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
254         }
255         if (seqnum_suppressed) {
256                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
257         }
258
259         return ret;
260 }
261
262 /*
263   lock a record in the ltdb, given a key
264  */
265 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
266 {
267         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
268 }
269
270 /*
271   unlock a record in the ltdb, given a key
272  */
273 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
274 {
275         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
276         if (ret != 0) {
277                 DEBUG(DEBUG_ERR,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db->db_name, tdb_errorstr(ctdb_db->ltdb->tdb)));
278         }
279         return ret;
280 }
281
282
283 /*
284   delete a record from a normal database
285 */
286 int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key)
287 {
288         if (ctdb_db->persistent != 0) {
289                 DEBUG(DEBUG_ERR,("Trying to delete empty record in persistent database\n"));
290                 return 0;
291         }
292         if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
293                 DEBUG(DEBUG_ERR,("Failed to delete empty record."));
294                 return -1;
295         }
296         return 0;
297 }
298
299 int ctdb_trackingdb_add_pnn(struct ctdb_context *ctdb, TDB_DATA *data, uint32_t pnn)
300 {
301         int byte_pos = pnn / 8;
302         int bit_mask   = 1 << (pnn % 8);
303
304         if (byte_pos + 1 > data->dsize) {
305                 char *buf;
306
307                 buf = malloc(byte_pos + 1);
308                 memset(buf, 0, byte_pos + 1);
309                 if (buf == NULL) {
310                         DEBUG(DEBUG_ERR, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos + 1));
311                         return -1;
312                 }
313                 if (data->dptr != NULL) {
314                         memcpy(buf, data->dptr, data->dsize);
315                         free(data->dptr);
316                 }
317                 data->dptr  = (uint8_t *)buf;
318                 data->dsize = byte_pos + 1;
319         }
320
321         data->dptr[byte_pos] |= bit_mask;
322         return 0;
323 }
324
325 void ctdb_trackingdb_traverse(struct ctdb_context *ctdb, TDB_DATA data, ctdb_trackingdb_cb cb, void *private_data)
326 {
327         int i;
328
329         for(i = 0; i < data.dsize; i++) {
330                 int j;
331
332                 for (j=0; j<8; j++) {
333                         int mask = 1<<j;
334
335                         if (data.dptr[i] & mask) {
336                                 cb(ctdb, i * 8 + j, private_data);
337                         }
338                 }
339         }
340 }
341
342 /*
343   this is the dummy null procedure that all databases support
344 */
345 int ctdb_null_func(struct ctdb_call_info *call)
346 {
347         return 0;
348 }
349
350 /*
351   this is a plain fetch procedure that all databases support
352 */
353 int ctdb_fetch_func(struct ctdb_call_info *call)
354 {
355         call->reply_data = &call->record_data;
356         return 0;
357 }
358
359 /*
360   this is a plain fetch procedure that all databases support
361   this returns the full record including the ltdb header
362 */
363 int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
364 {
365         call->reply_data = talloc(call, TDB_DATA);
366         if (call->reply_data == NULL) {
367                 return -1;
368         }
369         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
370         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
371         if (call->reply_data->dptr == NULL) {
372                 return -1;
373         }
374         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
375         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
376
377         return 0;
378 }
379