Merge commit 'martins/master'
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   this is the dummy null procedure that all databases support
32 */
33 static int ctdb_null_func(struct ctdb_call_info *call)
34 {
35         return 0;
36 }
37
38 /*
39   this is a plain fetch procedure that all databases support
40 */
41 static int ctdb_fetch_func(struct ctdb_call_info *call)
42 {
43         call->reply_data = &call->record_data;
44         return 0;
45 }
46
47
48
49 struct lock_fetch_state {
50         struct ctdb_context *ctdb;
51         void (*recv_pkt)(void *, struct ctdb_req_header *);
52         void *recv_context;
53         struct ctdb_req_header *hdr;
54         uint32_t generation;
55         bool ignore_generation;
56 };
57
58 /*
59   called when we should retry the operation
60  */
61 static void lock_fetch_callback(void *p)
62 {
63         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
64         if (!state->ignore_generation &&
65             state->generation != state->ctdb->vnn_map->generation) {
66                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
67                 talloc_free(state->hdr);
68                 return;
69         }
70         state->recv_pkt(state->recv_context, state->hdr);
71         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
72 }
73
74
75 /*
76   do a non-blocking ltdb_lock, deferring this ctdb request until we
77   have the chainlock
78
79   It does the following:
80
81    1) tries to get the chainlock. If it succeeds, then it returns 0
82
83    2) if it fails to get a chainlock immediately then it sets up a
84    non-blocking chainlock via ctdb_lockwait, and when it gets the
85    chainlock it re-submits this ctdb request to the main packet
86    receive function
87
88    This effectively queues all ctdb requests that cannot be
89    immediately satisfied until it can get the lock. This means that
90    the main ctdb daemon will not block waiting for a chainlock held by
91    a client
92
93    There are 3 possible return values:
94
95        0:    means that it got the lock immediately.
96       -1:    means that it failed to get the lock, and won't retry
97       -2:    means that it failed to get the lock immediately, but will retry
98  */
99 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
100                            TDB_DATA key, struct ctdb_req_header *hdr,
101                            void (*recv_pkt)(void *, struct ctdb_req_header *),
102                            void *recv_context, bool ignore_generation)
103 {
104         int ret;
105         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
106         struct lockwait_handle *h;
107         struct lock_fetch_state *state;
108         
109         ret = tdb_chainlock_nonblock(tdb, key);
110
111         if (ret != 0 &&
112             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
113                 /* a hard failure - don't try again */
114                 return -1;
115         }
116
117         /* when torturing, ensure we test the contended path */
118         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
119             random() % 5 == 0) {
120                 ret = -1;
121                 tdb_chainunlock(tdb, key);
122         }
123
124         /* first the non-contended path */
125         if (ret == 0) {
126                 return 0;
127         }
128
129         state = talloc(hdr, struct lock_fetch_state);
130         state->ctdb = ctdb_db->ctdb;
131         state->hdr = hdr;
132         state->recv_pkt = recv_pkt;
133         state->recv_context = recv_context;
134         state->generation = ctdb_db->ctdb->vnn_map->generation;
135         state->ignore_generation = ignore_generation;
136
137         /* now the contended path */
138         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
139         if (h == NULL) {
140                 tdb_chainunlock(tdb, key);
141                 return -1;
142         }
143
144         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
145            so it won't be freed yet */
146         talloc_steal(state, hdr);
147         talloc_steal(state, h);
148
149         /* now tell the caller than we will retry asynchronously */
150         return -2;
151 }
152
153 /*
154   a varient of ctdb_ltdb_lock_requeue that also fetches the record
155  */
156 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
157                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
158                                  struct ctdb_req_header *hdr, TDB_DATA *data,
159                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
160                                  void *recv_context, bool ignore_generation)
161 {
162         int ret;
163
164         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
165                                      recv_context, ignore_generation);
166         if (ret == 0) {
167                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
168                 if (ret != 0) {
169                         ctdb_ltdb_unlock(ctdb_db, key);
170                 }
171         }
172         return ret;
173 }
174
175
176 /*
177   paraoid check to see if the db is empty
178  */
179 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
180 {
181         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
182         int count = tdb_traverse_read(tdb, NULL, NULL);
183         if (count != 0) {
184                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
185                          ctdb_db->db_path));
186                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
187         }
188 }
189
190
191 /*
192   attach to a database, handling both persistent and non-persistent databases
193   return 0 on success, -1 on failure
194  */
195 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
196 {
197         struct ctdb_db_context *ctdb_db, *tmp_db;
198         int ret;
199         struct TDB_DATA key;
200         unsigned tdb_flags;
201
202         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
203         CTDB_NO_MEMORY(ctdb, ctdb_db);
204
205         ctdb_db->ctdb = ctdb;
206         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
207         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
208
209         key.dsize = strlen(db_name)+1;
210         key.dptr  = discard_const(db_name);
211         ctdb_db->db_id = ctdb_hash(&key);
212         ctdb_db->persistent = persistent;
213
214         /* check for hash collisions */
215         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
216                 if (tmp_db->db_id == ctdb_db->db_id) {
217                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
218                                  tmp_db->db_id, db_name, tmp_db->db_name));
219                         talloc_free(ctdb_db);
220                         return -1;
221                 }
222         }
223
224         if (ctdb->db_directory == NULL) {
225                 ctdb->db_directory = VARDIR "/ctdb";
226         }
227
228         /* make sure the db directory exists */
229         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
230                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n", 
231                          ctdb->db_directory));
232                 talloc_free(ctdb_db);
233                 return -1;
234         }
235
236         if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
237                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n", 
238                          ctdb->db_directory_persistent));
239                 talloc_free(ctdb_db);
240                 return -1;
241         }
242
243         /* open the database */
244         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
245                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
246                                            db_name, ctdb->pnn);
247
248         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
249         if (!ctdb->do_setsched) {
250                 tdb_flags |= TDB_NOMMAP;
251         }
252
253         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
254                                       ctdb->tunable.database_hash_size, 
255                                       tdb_flags, 
256                                       O_CREAT|O_RDWR, 0666);
257         if (ctdb_db->ltdb == NULL) {
258                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
259                 talloc_free(ctdb_db);
260                 return -1;
261         }
262
263         if (!persistent) {
264                 ctdb_check_db_empty(ctdb_db);
265         }
266
267         DLIST_ADD(ctdb->db_list, ctdb_db);
268
269         /* setting this can help some high churn databases */
270         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
271
272         /* 
273            all databases support the "null" function. we need this in
274            order to do forced migration of records
275         */
276         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
277         if (ret != 0) {
278                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
279                 talloc_free(ctdb_db);
280                 return -1;
281         }
282
283         /* 
284            all databases support the "fetch" function. we need this
285            for efficient Samba3 ctdb fetch
286         */
287         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
288         if (ret != 0) {
289                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
290                 talloc_free(ctdb_db);
291                 return -1;
292         }
293
294         ret = ctdb_vacuum_init(ctdb_db);
295         if (ret != 0) {
296                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for database '%s'\n", ctdb_db->db_name));
297                 talloc_free(ctdb_db);
298                 return -1;
299         }
300
301
302         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
303         
304         /* success */
305         return 0;
306 }
307
308
309 /*
310   a client has asked to attach a new database
311  */
312 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
313                                TDB_DATA *outdata, uint64_t tdb_flags, 
314                                bool persistent)
315 {
316         const char *db_name = (const char *)indata.dptr;
317         struct ctdb_db_context *db;
318         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
319
320         /* the client can optionally pass additional tdb flags, but we
321            only allow a subset of those on the database in ctdb. Note
322            that tdb_flags is passed in via the (otherwise unused)
323            srvid to the attach control */
324         tdb_flags &= TDB_NOSYNC;
325
326         /* If the node is inactive it is not part of the cluster
327            and we should not allow clients to attach to any
328            databases
329         */
330         if (node->flags & NODE_FLAGS_INACTIVE) {
331                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
332                 return -1;
333         }
334
335
336         /* see if we already have this name */
337         db = ctdb_db_handle(ctdb, db_name);
338         if (db) {
339                 outdata->dptr  = (uint8_t *)&db->db_id;
340                 outdata->dsize = sizeof(db->db_id);
341                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
342                 return 0;
343         }
344
345         if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
346                 return -1;
347         }
348
349         db = ctdb_db_handle(ctdb, db_name);
350         if (!db) {
351                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
352                 return -1;
353         }
354
355         /* remember the flags the client has specified */
356         tdb_add_flags(db->ltdb->tdb, tdb_flags);
357
358         outdata->dptr  = (uint8_t *)&db->db_id;
359         outdata->dsize = sizeof(db->db_id);
360
361         /* tell all the other nodes about this database */
362         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
363                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
364                                                 CTDB_CONTROL_DB_ATTACH,
365                                  0, CTDB_CTRL_FLAG_NOREPLY,
366                                  indata, NULL, NULL);
367
368         /* success */
369         return 0;
370 }
371
372
373 /*
374   attach to all existing persistent databases
375  */
376 int ctdb_attach_persistent(struct ctdb_context *ctdb)
377 {
378         DIR *d;
379         struct dirent *de;
380
381         /* open the persistent db directory and scan it for files */
382         d = opendir(ctdb->db_directory_persistent);
383         if (d == NULL) {
384                 return 0;
385         }
386
387         while ((de=readdir(d))) {
388                 char *p, *s;
389                 size_t len = strlen(de->d_name);
390                 uint32_t node;
391                 
392                 s = talloc_strdup(ctdb, de->d_name);
393                 CTDB_NO_MEMORY(ctdb, s);
394
395                 /* only accept names ending in .tdb */
396                 p = strstr(s, ".tdb.");
397                 if (len < 7 || p == NULL) {
398                         talloc_free(s);
399                         continue;
400                 }
401                 if (sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
402                         talloc_free(s);
403                         continue;
404                 }
405                 p[4] = 0;
406
407                 if (ctdb_local_attach(ctdb, s, true) != 0) {
408                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
409                         closedir(d);
410                         talloc_free(s);
411                         return -1;
412                 }
413                 DEBUG(DEBUG_NOTICE,("Attached to persistent database %s\n", s));
414
415                 talloc_free(s);
416         }
417         closedir(d);
418         return 0;
419 }
420
421 /*
422   called when a broadcast seqnum update comes in
423  */
424 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
425 {
426         struct ctdb_db_context *ctdb_db;
427         if (srcnode == ctdb->pnn) {
428                 /* don't update ourselves! */
429                 return 0;
430         }
431
432         ctdb_db = find_ctdb_db(ctdb, db_id);
433         if (!ctdb_db) {
434                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
435                 return -1;
436         }
437
438         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
439         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
440         return 0;
441 }
442
443 /*
444   timer to check for seqnum changes in a ltdb and propogate them
445  */
446 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
447                                    struct timeval t, void *p)
448 {
449         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
450         struct ctdb_context *ctdb = ctdb_db->ctdb;
451         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
452         if (new_seqnum != ctdb_db->seqnum) {
453                 /* something has changed - propogate it */
454                 TDB_DATA data;
455                 data.dptr = (uint8_t *)&ctdb_db->db_id;
456                 data.dsize = sizeof(uint32_t);
457                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
458                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
459                                          data, NULL, NULL);             
460         }
461         ctdb_db->seqnum = new_seqnum;
462
463         /* setup a new timer */
464         ctdb_db->te = 
465                 event_add_timed(ctdb->ev, ctdb_db, 
466                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
467                                 ctdb_ltdb_seqnum_check, ctdb_db);
468 }
469
470 /*
471   enable seqnum handling on this db
472  */
473 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
474 {
475         struct ctdb_db_context *ctdb_db;
476         ctdb_db = find_ctdb_db(ctdb, db_id);
477         if (!ctdb_db) {
478                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
479                 return -1;
480         }
481
482         if (ctdb_db->te == NULL) {
483                 ctdb_db->te = 
484                         event_add_timed(ctdb->ev, ctdb_db, 
485                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
486                                         ctdb_ltdb_seqnum_check, ctdb_db);
487         }
488
489         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
490         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
491         return 0;
492 }
493