- start moving tunable variables into their own structure
[metze/ctdb/wip.git] / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_null_func(struct ctdb_call_info *call)
49 {
50         return 0;
51 }
52
53 /*
54   this is a plain fetch procedure that all databases support
55 */
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = &call->record_data;
59         return 0;
60 }
61
62
63 /*
64   return the lmaster given a key
65 */
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
67 {
68         uint32_t idx, lmaster;
69
70         idx = ctdb_hash(key) % ctdb->vnn_map->size;
71         lmaster = ctdb->vnn_map->map[idx];
72
73         return lmaster;
74 }
75
76
77 /*
78   construct an initial header for a record with no ltdb header yet
79 */
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
81                                 TDB_DATA key,
82                                 struct ctdb_ltdb_header *header)
83 {
84         header->rsn = 0;
85         /* initial dmaster is the lmaster */
86         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87         header->laccessor = header->dmaster;
88         header->lacount = 0;
89 }
90
91
92 /*
93   fetch a record from the ltdb, separating out the header information
94   and returning the body of the record. A valid (initial) header is
95   returned if the record is not present
96 */
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
98                     TDB_DATA key, struct ctdb_ltdb_header *header, 
99                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
100 {
101         TDB_DATA rec;
102         struct ctdb_context *ctdb = ctdb_db->ctdb;
103
104         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105         if (rec.dsize < sizeof(*header)) {
106                 TDB_DATA d2;
107                 /* return an initial header */
108                 if (rec.dptr) free(rec.dptr);
109                 if (ctdb->vnn_map == NULL) {
110                         /* called from the client */
111                         ZERO_STRUCTP(data);
112                         header->dmaster = (uint32_t)-1;
113                         return -1;
114                 }
115                 ltdb_initial_header(ctdb_db, key, header);
116                 ZERO_STRUCT(d2);
117                 if (data) {
118                         *data = d2;
119                 }
120                 ctdb_ltdb_store(ctdb_db, key, header, d2);
121                 return 0;
122         }
123
124         *header = *(struct ctdb_ltdb_header *)rec.dptr;
125
126         if (data) {
127                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
128                 data->dptr = talloc_memdup(mem_ctx, 
129                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
130                                            data->dsize);
131         }
132
133         free(rec.dptr);
134         if (data) {
135                 CTDB_NO_MEMORY(ctdb, data->dptr);
136         }
137
138         return 0;
139 }
140
141
142 /*
143   fetch a record from the ltdb, separating out the header information
144   and returning the body of the record. A valid (initial) header is
145   returned if the record is not present
146 */
147 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
148                     struct ctdb_ltdb_header *header, TDB_DATA data)
149 {
150         struct ctdb_context *ctdb = ctdb_db->ctdb;
151         TDB_DATA rec;
152         int ret;
153
154         if (ctdb->flags & CTDB_FLAG_TORTURE) {
155                 struct ctdb_ltdb_header *h2;
156                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
157                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
158                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
159                         DEBUG(0,("RSN regression! %llu %llu\n",
160                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
161                 }
162                 if (rec.dptr) free(rec.dptr);
163         }
164
165         rec.dsize = sizeof(*header) + data.dsize;
166         rec.dptr = talloc_size(ctdb, rec.dsize);
167         CTDB_NO_MEMORY(ctdb, rec.dptr);
168
169         memcpy(rec.dptr, header, sizeof(*header));
170         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
171
172         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
173         talloc_free(rec.dptr);
174
175         return ret;
176 }
177
178
179 /*
180   lock a record in the ltdb, given a key
181  */
182 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
183 {
184         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
185 }
186
187 /*
188   unlock a record in the ltdb, given a key
189  */
190 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
191 {
192         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
193         if (ret != 0) {
194                 DEBUG(0,("tdb_chainunlock failed\n"));
195         }
196         return ret;
197 }
198
199 struct lock_fetch_state {
200         struct ctdb_context *ctdb;
201         void (*recv_pkt)(void *, struct ctdb_req_header *);
202         void *recv_context;
203         struct ctdb_req_header *hdr;
204         uint32_t generation;
205         bool ignore_generation;
206 };
207
208 /*
209   called when we should retry the operation
210  */
211 static void lock_fetch_callback(void *p)
212 {
213         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
214         if (!state->ignore_generation &&
215             state->generation != state->ctdb->vnn_map->generation) {
216                 DEBUG(0,("Discarding previous generation lockwait packet\n"));
217                 talloc_free(state->hdr);
218                 return;
219         }
220         state->recv_pkt(state->recv_context, state->hdr);
221         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
222 }
223
224
225 /*
226   do a non-blocking ltdb_lock, deferring this ctdb request until we
227   have the chainlock
228
229   It does the following:
230
231    1) tries to get the chainlock. If it succeeds, then it returns 0
232
233    2) if it fails to get a chainlock immediately then it sets up a
234    non-blocking chainlock via ctdb_lockwait, and when it gets the
235    chainlock it re-submits this ctdb request to the main packet
236    receive function
237
238    This effectively queues all ctdb requests that cannot be
239    immediately satisfied until it can get the lock. This means that
240    the main ctdb daemon will not block waiting for a chainlock held by
241    a client
242
243    There are 3 possible return values:
244
245        0:    means that it got the lock immediately.
246       -1:    means that it failed to get the lock, and won't retry
247       -2:    means that it failed to get the lock immediately, but will retry
248  */
249 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
250                            TDB_DATA key, struct ctdb_req_header *hdr,
251                            void (*recv_pkt)(void *, struct ctdb_req_header *),
252                            void *recv_context, bool ignore_generation)
253 {
254         int ret;
255         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
256         struct lockwait_handle *h;
257         struct lock_fetch_state *state;
258         
259         ret = tdb_chainlock_nonblock(tdb, key);
260
261         if (ret != 0 &&
262             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
263                 /* a hard failure - don't try again */
264                 return -1;
265         }
266
267         /* when torturing, ensure we test the contended path */
268         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
269             random() % 5 == 0) {
270                 ret = -1;
271                 tdb_chainunlock(tdb, key);
272         }
273
274         /* first the non-contended path */
275         if (ret == 0) {
276                 return 0;
277         }
278
279         state = talloc(hdr, struct lock_fetch_state);
280         state->ctdb = ctdb_db->ctdb;
281         state->hdr = hdr;
282         state->recv_pkt = recv_pkt;
283         state->recv_context = recv_context;
284         state->generation = ctdb_db->ctdb->vnn_map->generation;
285         state->ignore_generation = ignore_generation;
286
287         /* now the contended path */
288         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
289         if (h == NULL) {
290                 tdb_chainunlock(tdb, key);
291                 return -1;
292         }
293
294         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
295            so it won't be freed yet */
296         talloc_steal(state, hdr);
297         talloc_steal(state, h);
298
299         /* now tell the caller than we will retry asynchronously */
300         return -2;
301 }
302
303 /*
304   a varient of ctdb_ltdb_lock_requeue that also fetches the record
305  */
306 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
307                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
308                                  struct ctdb_req_header *hdr, TDB_DATA *data,
309                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
310                                  void *recv_context, bool ignore_generation)
311 {
312         int ret;
313
314         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
315                                      recv_context, ignore_generation);
316         if (ret == 0) {
317                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
318                 if (ret != 0) {
319                         ctdb_ltdb_unlock(ctdb_db, key);
320                 }
321         }
322         return ret;
323 }
324
325
326 /*
327   paraoid check to see if the db is empty
328  */
329 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
330 {
331         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
332         int count = tdb_traverse_read(tdb, NULL, NULL);
333         if (count != 0) {
334                 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
335                          ctdb_db->db_path));
336                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
337         }
338 }
339
340 /*
341   a client has asked to attach a new database
342  */
343 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
344                                TDB_DATA *outdata)
345 {
346         const char *db_name = (const char *)indata.dptr;
347         struct ctdb_db_context *ctdb_db, *tmp_db;
348         int ret;
349
350         /* see if we already have this name */
351         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
352                 if (strcmp(db_name, tmp_db->db_name) == 0) {
353                         /* this is not an error */
354                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
355                         outdata->dsize = sizeof(tmp_db->db_id);
356                         return 0;
357                 }
358         }
359
360         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
361         CTDB_NO_MEMORY(ctdb, ctdb_db);
362
363         ctdb_db->ctdb = ctdb;
364         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
365         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
366
367         ctdb_db->db_id = ctdb_hash(&indata);
368
369         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
370         outdata->dsize = sizeof(ctdb_db->db_id);
371
372         /* check for hash collisions */
373         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
374                 if (tmp_db->db_id == ctdb_db->db_id) {
375                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
376                                  tmp_db->db_id, db_name, tmp_db->db_name));
377                         talloc_free(ctdb_db);
378                         return -1;
379                 }
380         }
381
382         if (ctdb->db_directory == NULL) {
383                 ctdb->db_directory = VARDIR "/ctdb";
384         }
385
386         /* make sure the db directory exists */
387         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
388                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
389                          ctdb->db_directory));
390                 talloc_free(ctdb_db);
391                 return -1;
392         }
393
394         /* open the database */
395         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
396                                            ctdb->db_directory, 
397                                            db_name, ctdb->vnn);
398
399         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
400                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
401         if (ctdb_db->ltdb == NULL) {
402                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
403                 talloc_free(ctdb_db);
404                 return -1;
405         }
406
407         ctdb_check_db_empty(ctdb_db);
408
409         DLIST_ADD(ctdb->db_list, ctdb_db);
410
411         /* 
412            all databases support the "null" function. we need this in
413            order to do forced migration of records
414         */
415         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
416         if (ret != 0) {
417                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
418                 talloc_free(ctdb_db);
419                 return -1;
420         }
421
422         /* 
423            all databases support the "fetch" function. we need this
424            for efficient Samba3 ctdb fetch
425         */
426         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
427         if (ret != 0) {
428                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
429                 talloc_free(ctdb_db);
430                 return -1;
431         }
432         
433         /* tell all the other nodes about this database */
434         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
435                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
436                                  indata, NULL, NULL);
437
438         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
439
440         /* success */
441         return 0;
442 }
443
444 /*
445   called when a broadcast seqnum update comes in
446  */
447 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
448 {
449         struct ctdb_db_context *ctdb_db;
450         if (srcnode == ctdb->vnn) {
451                 /* don't update ourselves! */
452                 return 0;
453         }
454
455         ctdb_db = find_ctdb_db(ctdb, db_id);
456         if (!ctdb_db) {
457                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
458                 return -1;
459         }
460
461         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
462         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
463         return 0;
464 }
465
466 /*
467   timer to check for seqnum changes in a ltdb and propogate them
468  */
469 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
470                                    struct timeval t, void *p)
471 {
472         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
473         struct ctdb_context *ctdb = ctdb_db->ctdb;
474         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
475         if (new_seqnum != ctdb_db->seqnum) {
476                 /* something has changed - propogate it */
477                 TDB_DATA data;
478                 data.dptr = (uint8_t *)&ctdb_db->db_id;
479                 data.dsize = sizeof(uint32_t);
480                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
481                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
482                                          data, NULL, NULL);             
483         }
484         ctdb_db->seqnum = new_seqnum;
485
486         /* setup a new timer */
487         ctdb_db->te = 
488                 event_add_timed(ctdb->ev, ctdb_db, 
489                                 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
490                                 ctdb_ltdb_seqnum_check, ctdb_db);
491 }
492
493 /*
494   enable seqnum handling on this db
495  */
496 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
497 {
498         struct ctdb_db_context *ctdb_db;
499         ctdb_db = find_ctdb_db(ctdb, db_id);
500         if (!ctdb_db) {
501                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
502                 return -1;
503         }
504
505         if (ctdb_db->te == NULL) {
506                 ctdb_db->te = 
507                         event_add_timed(ctdb->ev, ctdb_db, 
508                                         timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
509                                         ctdb_ltdb_seqnum_check, ctdb_db);
510         }
511
512         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
513         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
514         return 0;
515 }
516