merge from tridge
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_ltdb.c
1 /* 
2    ctdb ltdb code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   find an attached ctdb_db handle given a name
32  */
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
34 {
35         struct ctdb_db_context *tmp_db;
36         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37                 if (strcmp(name, tmp_db->db_name) == 0) {
38                         return tmp_db;
39                 }
40         }
41         return NULL;
42 }
43
44
45 /*
46   this is the dummy null procedure that all databases support
47 */
48 static int ctdb_null_func(struct ctdb_call_info *call)
49 {
50         return 0;
51 }
52
53 /*
54   this is a plain fetch procedure that all databases support
55 */
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = &call->record_data;
59         return 0;
60 }
61
62
63 /*
64   return the lmaster given a key
65 */
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
67 {
68         uint32_t idx, lmaster;
69
70         idx = ctdb_hash(key) % ctdb->vnn_map->size;
71         lmaster = ctdb->vnn_map->map[idx];
72
73         return lmaster;
74 }
75
76
77 /*
78   construct an initial header for a record with no ltdb header yet
79 */
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, 
81                                 TDB_DATA key,
82                                 struct ctdb_ltdb_header *header)
83 {
84         header->rsn = 0;
85         /* initial dmaster is the lmaster */
86         header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87         header->laccessor = header->dmaster;
88         header->lacount = 0;
89 }
90
91
92 /*
93   fetch a record from the ltdb, separating out the header information
94   and returning the body of the record. A valid (initial) header is
95   returned if the record is not present
96 */
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, 
98                     TDB_DATA key, struct ctdb_ltdb_header *header, 
99                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
100 {
101         TDB_DATA rec;
102         struct ctdb_context *ctdb = ctdb_db->ctdb;
103
104         rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105         if (rec.dsize < sizeof(*header)) {
106                 TDB_DATA d2;
107                 /* return an initial header */
108                 if (rec.dptr) free(rec.dptr);
109                 ltdb_initial_header(ctdb_db, key, header);
110                 ZERO_STRUCT(d2);
111                 if (data) {
112                         *data = d2;
113                 }
114                 ctdb_ltdb_store(ctdb_db, key, header, d2);
115                 return 0;
116         }
117
118         *header = *(struct ctdb_ltdb_header *)rec.dptr;
119
120         if (data) {
121                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
122                 data->dptr = talloc_memdup(mem_ctx, 
123                                            sizeof(struct ctdb_ltdb_header)+rec.dptr,
124                                            data->dsize);
125         }
126
127         free(rec.dptr);
128         if (data) {
129                 CTDB_NO_MEMORY(ctdb, data->dptr);
130         }
131
132         return 0;
133 }
134
135
136 /*
137   fetch a record from the ltdb, separating out the header information
138   and returning the body of the record. A valid (initial) header is
139   returned if the record is not present
140 */
141 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
142                     struct ctdb_ltdb_header *header, TDB_DATA data)
143 {
144         struct ctdb_context *ctdb = ctdb_db->ctdb;
145         TDB_DATA rec;
146         int ret;
147
148         rec.dsize = sizeof(*header) + data.dsize;
149         rec.dptr = talloc_size(ctdb, rec.dsize);
150         CTDB_NO_MEMORY(ctdb, rec.dptr);
151
152         memcpy(rec.dptr, header, sizeof(*header));
153         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
154
155         ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
156         talloc_free(rec.dptr);
157
158         return ret;
159 }
160
161
162 /*
163   lock a record in the ltdb, given a key
164  */
165 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
166 {
167         return tdb_chainlock(ctdb_db->ltdb->tdb, key);
168 }
169
170 /*
171   unlock a record in the ltdb, given a key
172  */
173 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
174 {
175         int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
176         if (ret != 0) {
177                 DEBUG(0,("tdb_chainunlock failed\n"));
178         }
179         return ret;
180 }
181
182 struct lock_fetch_state {
183         struct ctdb_context *ctdb;
184         void (*recv_pkt)(void *, uint8_t *, uint32_t);
185         void *recv_context;
186         struct ctdb_req_header *hdr;
187 };
188
189 /*
190   called when we should retry the operation
191  */
192 static void lock_fetch_callback(void *p)
193 {
194         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
195         state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
196         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
197 }
198
199
200 /*
201   do a non-blocking ltdb_lock, deferring this ctdb request until we
202   have the chainlock
203
204   It does the following:
205
206    1) tries to get the chainlock. If it succeeds, then it returns 0
207
208    2) if it fails to get a chainlock immediately then it sets up a
209    non-blocking chainlock via ctdb_lockwait, and when it gets the
210    chainlock it re-submits this ctdb request to the main packet
211    receive function
212
213    This effectively queues all ctdb requests that cannot be
214    immediately satisfied until it can get the lock. This means that
215    the main ctdb daemon will not block waiting for a chainlock held by
216    a client
217
218    There are 3 possible return values:
219
220        0:    means that it got the lock immediately.
221       -1:    means that it failed to get the lock, and won't retry
222       -2:    means that it failed to get the lock immediately, but will retry
223  */
224 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
225                            TDB_DATA key, struct ctdb_req_header *hdr,
226                            void (*recv_pkt)(void *, uint8_t *, uint32_t ),
227                            void *recv_context)
228 {
229         int ret;
230         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
231         struct lockwait_handle *h;
232         struct lock_fetch_state *state;
233         
234         ret = tdb_chainlock_nonblock(tdb, key);
235
236         if (ret != 0 &&
237             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
238                 /* a hard failure - don't try again */
239                 return -1;
240         }
241
242         /* when torturing, ensure we test the contended path */
243         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
244             random() % 5 == 0) {
245                 ret = -1;
246                 tdb_chainunlock(tdb, key);
247         }
248
249         /* first the non-contended path */
250         if (ret == 0) {
251                 return 0;
252         }
253
254         state = talloc(hdr, struct lock_fetch_state);
255         state->ctdb = ctdb_db->ctdb;
256         state->hdr = hdr;
257         state->recv_pkt = recv_pkt;
258         state->recv_context = recv_context;
259
260         /* now the contended path */
261         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
262         if (h == NULL) {
263                 tdb_chainunlock(tdb, key);
264                 return -1;
265         }
266
267         /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
268            so it won't be freed yet */
269         talloc_steal(state, hdr);
270         talloc_steal(state, h);
271
272         /* now tell the caller than we will retry asynchronously */
273         return -2;
274 }
275
276 /*
277   a varient of ctdb_ltdb_lock_requeue that also fetches the record
278  */
279 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
280                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
281                                  struct ctdb_req_header *hdr, TDB_DATA *data,
282                                  void (*recv_pkt)(void *, uint8_t *, uint32_t ),
283                                  void *recv_context)
284 {
285         int ret;
286
287         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
288         if (ret == 0) {
289                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
290                 if (ret != 0) {
291                         ctdb_ltdb_unlock(ctdb_db, key);
292                 }
293         }
294         return ret;
295 }
296
297
298 /*
299   a client has asked to attach a new database
300  */
301 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
302                                TDB_DATA *outdata)
303 {
304         const char *db_name = (const char *)indata.dptr;
305         struct ctdb_db_context *ctdb_db, *tmp_db;
306         int ret;
307
308         /* see if we already have this name */
309         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
310                 if (strcmp(db_name, tmp_db->db_name) == 0) {
311                         /* this is not an error */
312                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
313                         outdata->dsize = sizeof(tmp_db->db_id);
314                         return 0;
315                 }
316         }
317
318         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
319         CTDB_NO_MEMORY(ctdb, ctdb_db);
320
321         ctdb_db->ctdb = ctdb;
322         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
323         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
324
325         ctdb_db->db_id = ctdb_hash(&indata);
326
327         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
328         outdata->dsize = sizeof(ctdb_db->db_id);
329
330         /* check for hash collisions */
331         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
332                 if (tmp_db->db_id == ctdb_db->db_id) {
333                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
334                                  db_name, tmp_db->db_name));
335                         talloc_free(ctdb_db);
336                         return -1;
337                 }
338         }
339
340         if (ctdb->db_directory == NULL) {
341                 ctdb->db_directory = VARDIR "/ctdb";
342         }
343
344         /* make sure the db directory exists */
345         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
346                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
347                          ctdb->db_directory));
348                 talloc_free(ctdb_db);
349                 return -1;
350         }
351
352         /* open the database */
353         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
354                                            ctdb->db_directory, 
355                                            db_name, ctdb->vnn);
356
357         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
358                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
359         if (ctdb_db->ltdb == NULL) {
360                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
361                 talloc_free(ctdb_db);
362                 return -1;
363         }
364
365         DLIST_ADD(ctdb->db_list, ctdb_db);
366
367         /* 
368            all databases support the "null" function. we need this in
369            order to do forced migration of records
370         */
371         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
372         if (ret != 0) {
373                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
374                 talloc_free(ctdb_db);
375                 return -1;
376         }
377
378         /* 
379            all databases support the "fetch" function. we need this
380            for efficient Samba3 ctdb fetch
381         */
382         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
383         if (ret != 0) {
384                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
385                 talloc_free(ctdb_db);
386                 return -1;
387         }
388         
389         /* tell all the other nodes about this database */
390         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
391                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
392                                  indata, NULL, NULL);
393
394         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
395
396         /* success */
397         return 0;
398 }
399
400 /*
401   called when a broadcast seqnum update comes in
402  */
403 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
404 {
405         struct ctdb_db_context *ctdb_db;
406         if (srcnode == ctdb->vnn) {
407                 /* don't update ourselves! */
408                 return 0;
409         }
410
411         ctdb_db = find_ctdb_db(ctdb, db_id);
412         if (!ctdb_db) {
413                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n"));
414                 return -1;
415         }
416
417         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
418         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
419         return 0;
420 }
421
422 /*
423   timer to check for seqnum changes in a ltdb and propogate them
424  */
425 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
426                                    struct timeval t, void *p)
427 {
428         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
429         struct ctdb_context *ctdb = ctdb_db->ctdb;
430         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
431         if (new_seqnum != ctdb_db->seqnum) {
432                 /* something has changed - propogate it */
433                 TDB_DATA data;
434                 data.dptr = (uint8_t *)&ctdb_db->db_id;
435                 data.dsize = sizeof(uint32_t);
436                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
437                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
438                                          data, NULL, NULL);             
439         }
440         ctdb_db->seqnum = new_seqnum;
441
442         /* setup a new timer */
443         ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db, 
444                                       timeval_current_ofs(ctdb->seqnum_frequency, 0),
445                                       ctdb_ltdb_seqnum_check, ctdb_db);
446 }
447
448 /*
449   enable seqnum handling on this db
450  */
451 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
452 {
453         struct ctdb_db_context *ctdb_db;
454         ctdb_db = find_ctdb_db(ctdb, db_id);
455         if (!ctdb_db) {
456                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n"));
457                 return -1;
458         }
459
460         if (ctdb_db->te == NULL) {
461                 ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db, 
462                                               timeval_current_ofs(ctdb->seqnum_frequency, 0),
463                                               ctdb_ltdb_seqnum_check, ctdb_db);
464         }
465
466         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
467         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
468         return 0;
469 }
470
471 /*
472   enable seqnum handling on this db
473  */
474 int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency)
475 {
476         ctdb->seqnum_frequency = frequency;
477         return 0;
478 }