ctdb: pass TDB_DISALLOW_NESTING to all tdb_open/tdb_wrap_open calls
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include <ctype.h>
30
31 /*
32   this is the dummy null procedure that all databases support
33 */
34 static int ctdb_null_func(struct ctdb_call_info *call)
35 {
36         return 0;
37 }
38
39 /*
40   this is a plain fetch procedure that all databases support
41 */
42 static int ctdb_fetch_func(struct ctdb_call_info *call)
43 {
44         call->reply_data = &call->record_data;
45         return 0;
46 }
47
48
49
50 struct lock_fetch_state {
51         struct ctdb_context *ctdb;
52         void (*recv_pkt)(void *, struct ctdb_req_header *);
53         void *recv_context;
54         struct ctdb_req_header *hdr;
55         uint32_t generation;
56         bool ignore_generation;
57 };
58
59 /*
60   called when we should retry the operation
61  */
62 static void lock_fetch_callback(void *p)
63 {
64         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
65         if (!state->ignore_generation &&
66             state->generation != state->ctdb->vnn_map->generation) {
67                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
68                 talloc_free(state->hdr);
69                 return;
70         }
71         state->recv_pkt(state->recv_context, state->hdr);
72         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
73 }
74
75
76 /*
77   do a non-blocking ltdb_lock, deferring this ctdb request until we
78   have the chainlock
79
80   It does the following:
81
82    1) tries to get the chainlock. If it succeeds, then it returns 0
83
84    2) if it fails to get a chainlock immediately then it sets up a
85    non-blocking chainlock via ctdb_lockwait, and when it gets the
86    chainlock it re-submits this ctdb request to the main packet
87    receive function
88
89    This effectively queues all ctdb requests that cannot be
90    immediately satisfied until it can get the lock. This means that
91    the main ctdb daemon will not block waiting for a chainlock held by
92    a client
93
94    There are 3 possible return values:
95
96        0:    means that it got the lock immediately.
97       -1:    means that it failed to get the lock, and won't retry
98       -2:    means that it failed to get the lock immediately, but will retry
99  */
100 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
101                            TDB_DATA key, struct ctdb_req_header *hdr,
102                            void (*recv_pkt)(void *, struct ctdb_req_header *),
103                            void *recv_context, bool ignore_generation)
104 {
105         int ret;
106         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
107         struct lockwait_handle *h;
108         struct lock_fetch_state *state;
109         
110         ret = tdb_chainlock_nonblock(tdb, key);
111
112         if (ret != 0 &&
113             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
114                 /* a hard failure - don't try again */
115                 return -1;
116         }
117
118         /* when torturing, ensure we test the contended path */
119         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
120             random() % 5 == 0) {
121                 ret = -1;
122                 tdb_chainunlock(tdb, key);
123         }
124
125         /* first the non-contended path */
126         if (ret == 0) {
127                 return 0;
128         }
129
130         state = talloc(hdr, struct lock_fetch_state);
131         state->ctdb = ctdb_db->ctdb;
132         state->hdr = hdr;
133         state->recv_pkt = recv_pkt;
134         state->recv_context = recv_context;
135         state->generation = ctdb_db->ctdb->vnn_map->generation;
136         state->ignore_generation = ignore_generation;
137
138         /* now the contended path */
139         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
140         if (h == NULL) {
141                 tdb_chainunlock(tdb, key);
142                 return -1;
143         }
144
145         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
146            so it won't be freed yet */
147         talloc_steal(state, hdr);
148         talloc_steal(state, h);
149
150         /* now tell the caller than we will retry asynchronously */
151         return -2;
152 }
153
154 /*
155   a varient of ctdb_ltdb_lock_requeue that also fetches the record
156  */
157 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
158                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
159                                  struct ctdb_req_header *hdr, TDB_DATA *data,
160                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
161                                  void *recv_context, bool ignore_generation)
162 {
163         int ret;
164
165         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
166                                      recv_context, ignore_generation);
167         if (ret == 0) {
168                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
169                 if (ret != 0) {
170                         ctdb_ltdb_unlock(ctdb_db, key);
171                 }
172         }
173         return ret;
174 }
175
176
177 /*
178   paraoid check to see if the db is empty
179  */
180 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
181 {
182         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
183         int count = tdb_traverse_read(tdb, NULL, NULL);
184         if (count != 0) {
185                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
186                          ctdb_db->db_path));
187                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
188         }
189 }
190
191
192 /*
193   attach to a database, handling both persistent and non-persistent databases
194   return 0 on success, -1 on failure
195  */
196 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
197 {
198         struct ctdb_db_context *ctdb_db, *tmp_db;
199         int ret;
200         struct TDB_DATA key;
201         unsigned tdb_flags;
202
203         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
204         CTDB_NO_MEMORY(ctdb, ctdb_db);
205
206         ctdb_db->priority = 1;
207         ctdb_db->ctdb = ctdb;
208         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
209         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
210
211         key.dsize = strlen(db_name)+1;
212         key.dptr  = discard_const(db_name);
213         ctdb_db->db_id = ctdb_hash(&key);
214         ctdb_db->persistent = persistent;
215
216         /* check for hash collisions */
217         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
218                 if (tmp_db->db_id == ctdb_db->db_id) {
219                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
220                                  tmp_db->db_id, db_name, tmp_db->db_name));
221                         talloc_free(ctdb_db);
222                         return -1;
223                 }
224         }
225
226         if (ctdb->db_directory == NULL) {
227                 ctdb->db_directory = VARDIR "/ctdb";
228         }
229
230         /* make sure the db directory exists */
231         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
232                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n", 
233                          ctdb->db_directory));
234                 talloc_free(ctdb_db);
235                 return -1;
236         }
237
238         if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
239                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n", 
240                          ctdb->db_directory_persistent));
241                 talloc_free(ctdb_db);
242                 return -1;
243         }
244
245         /* open the database */
246         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
247                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
248                                            db_name, ctdb->pnn);
249
250         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
251         if (!ctdb->do_setsched) {
252                 tdb_flags |= TDB_NOMMAP;
253         }
254         tdb_flags |= TDB_DISALLOW_NESTING;
255
256         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
257                                       ctdb->tunable.database_hash_size, 
258                                       tdb_flags, 
259                                       O_CREAT|O_RDWR, 0666);
260         if (ctdb_db->ltdb == NULL) {
261                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
262                 talloc_free(ctdb_db);
263                 return -1;
264         }
265
266         if (!persistent) {
267                 ctdb_check_db_empty(ctdb_db);
268         }
269
270         DLIST_ADD(ctdb->db_list, ctdb_db);
271
272         /* setting this can help some high churn databases */
273         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
274
275         /* 
276            all databases support the "null" function. we need this in
277            order to do forced migration of records
278         */
279         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
280         if (ret != 0) {
281                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
282                 talloc_free(ctdb_db);
283                 return -1;
284         }
285
286         /* 
287            all databases support the "fetch" function. we need this
288            for efficient Samba3 ctdb fetch
289         */
290         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
291         if (ret != 0) {
292                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
293                 talloc_free(ctdb_db);
294                 return -1;
295         }
296
297         ret = ctdb_vacuum_init(ctdb_db);
298         if (ret != 0) {
299                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
300                                   "database '%s'\n", ctdb_db->db_name));
301                 talloc_free(ctdb_db);
302                 return -1;
303         }
304
305
306         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
307         
308         /* success */
309         return 0;
310 }
311
312
313 /*
314   a client has asked to attach a new database
315  */
316 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
317                                TDB_DATA *outdata, uint64_t tdb_flags, 
318                                bool persistent)
319 {
320         const char *db_name = (const char *)indata.dptr;
321         struct ctdb_db_context *db;
322         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
323
324         /* the client can optionally pass additional tdb flags, but we
325            only allow a subset of those on the database in ctdb. Note
326            that tdb_flags is passed in via the (otherwise unused)
327            srvid to the attach control */
328         tdb_flags &= TDB_NOSYNC;
329
330         /* If the node is inactive it is not part of the cluster
331            and we should not allow clients to attach to any
332            databases
333         */
334         if (node->flags & NODE_FLAGS_INACTIVE) {
335                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
336                 return -1;
337         }
338
339
340         /* see if we already have this name */
341         db = ctdb_db_handle(ctdb, db_name);
342         if (db) {
343                 outdata->dptr  = (uint8_t *)&db->db_id;
344                 outdata->dsize = sizeof(db->db_id);
345                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
346                 return 0;
347         }
348
349         if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
350                 return -1;
351         }
352
353         db = ctdb_db_handle(ctdb, db_name);
354         if (!db) {
355                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
356                 return -1;
357         }
358
359         /* remember the flags the client has specified */
360         tdb_add_flags(db->ltdb->tdb, tdb_flags);
361
362         outdata->dptr  = (uint8_t *)&db->db_id;
363         outdata->dsize = sizeof(db->db_id);
364
365         /* tell all the other nodes about this database */
366         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
367                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
368                                                 CTDB_CONTROL_DB_ATTACH,
369                                  0, CTDB_CTRL_FLAG_NOREPLY,
370                                  indata, NULL, NULL);
371
372         /* success */
373         return 0;
374 }
375
376
377 /*
378   attach to all existing persistent databases
379  */
380 int ctdb_attach_persistent(struct ctdb_context *ctdb)
381 {
382         DIR *d;
383         struct dirent *de;
384
385         /* open the persistent db directory and scan it for files */
386         d = opendir(ctdb->db_directory_persistent);
387         if (d == NULL) {
388                 return 0;
389         }
390
391         while ((de=readdir(d))) {
392                 char *p, *s, *q;
393                 size_t len = strlen(de->d_name);
394                 uint32_t node;
395                 int invalid_name = 0;
396                 
397                 s = talloc_strdup(ctdb, de->d_name);
398                 CTDB_NO_MEMORY(ctdb, s);
399
400                 /* only accept names ending in .tdb */
401                 p = strstr(s, ".tdb.");
402                 if (len < 7 || p == NULL) {
403                         talloc_free(s);
404                         continue;
405                 }
406
407                 /* only accept names ending with .tdb. and any number of digits */
408                 q = p+5;
409                 while (*q != 0 && invalid_name == 0) {
410                         if (!isdigit(*q++)) {
411                                 invalid_name = 1;
412                         }
413                 }
414                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
415                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
416                         talloc_free(s);
417                         continue;
418                 }
419                 p[4] = 0;
420
421                 if (ctdb_local_attach(ctdb, s, true) != 0) {
422                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
423                         closedir(d);
424                         talloc_free(s);
425                         return -1;
426                 }
427                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
428
429                 talloc_free(s);
430         }
431         closedir(d);
432         return 0;
433 }
434
435 /*
436   called when a broadcast seqnum update comes in
437  */
438 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
439 {
440         struct ctdb_db_context *ctdb_db;
441         if (srcnode == ctdb->pnn) {
442                 /* don't update ourselves! */
443                 return 0;
444         }
445
446         ctdb_db = find_ctdb_db(ctdb, db_id);
447         if (!ctdb_db) {
448                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
449                 return -1;
450         }
451
452         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
453         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
454         return 0;
455 }
456
457 /*
458   timer to check for seqnum changes in a ltdb and propogate them
459  */
460 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
461                                    struct timeval t, void *p)
462 {
463         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
464         struct ctdb_context *ctdb = ctdb_db->ctdb;
465         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
466         if (new_seqnum != ctdb_db->seqnum) {
467                 /* something has changed - propogate it */
468                 TDB_DATA data;
469                 data.dptr = (uint8_t *)&ctdb_db->db_id;
470                 data.dsize = sizeof(uint32_t);
471                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
472                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
473                                          data, NULL, NULL);             
474         }
475         ctdb_db->seqnum = new_seqnum;
476
477         /* setup a new timer */
478         ctdb_db->seqnum_update =
479                 event_add_timed(ctdb->ev, ctdb_db, 
480                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
481                                 ctdb_ltdb_seqnum_check, ctdb_db);
482 }
483
484 /*
485   enable seqnum handling on this db
486  */
487 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
488 {
489         struct ctdb_db_context *ctdb_db;
490         ctdb_db = find_ctdb_db(ctdb, db_id);
491         if (!ctdb_db) {
492                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
493                 return -1;
494         }
495
496         if (ctdb_db->seqnum_update == NULL) {
497                 ctdb_db->seqnum_update =
498                         event_add_timed(ctdb->ev, ctdb_db, 
499                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
500                                         ctdb_ltdb_seqnum_check, ctdb_db);
501         }
502
503         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
504         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
505         return 0;
506 }
507
508 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
509 {
510         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
511         struct ctdb_db_context *ctdb_db;
512
513         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
514         if (!ctdb_db) {
515                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
516                 return -1;
517         }
518
519         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
520                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
521                 return -1;
522         }
523
524         ctdb_db->priority = db_prio->priority;
525         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
526
527         return 0;
528 }
529
530