add a control to set a database priority. Let newly created databases default to...
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29
30 /*
31   this is the dummy null procedure that all databases support
32 */
33 static int ctdb_null_func(struct ctdb_call_info *call)
34 {
35         return 0;
36 }
37
38 /*
39   this is a plain fetch procedure that all databases support
40 */
41 static int ctdb_fetch_func(struct ctdb_call_info *call)
42 {
43         call->reply_data = &call->record_data;
44         return 0;
45 }
46
47
48
49 struct lock_fetch_state {
50         struct ctdb_context *ctdb;
51         void (*recv_pkt)(void *, struct ctdb_req_header *);
52         void *recv_context;
53         struct ctdb_req_header *hdr;
54         uint32_t generation;
55         bool ignore_generation;
56 };
57
58 /*
59   called when we should retry the operation
60  */
61 static void lock_fetch_callback(void *p)
62 {
63         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
64         if (!state->ignore_generation &&
65             state->generation != state->ctdb->vnn_map->generation) {
66                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
67                 talloc_free(state->hdr);
68                 return;
69         }
70         state->recv_pkt(state->recv_context, state->hdr);
71         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
72 }
73
74
75 /*
76   do a non-blocking ltdb_lock, deferring this ctdb request until we
77   have the chainlock
78
79   It does the following:
80
81    1) tries to get the chainlock. If it succeeds, then it returns 0
82
83    2) if it fails to get a chainlock immediately then it sets up a
84    non-blocking chainlock via ctdb_lockwait, and when it gets the
85    chainlock it re-submits this ctdb request to the main packet
86    receive function
87
88    This effectively queues all ctdb requests that cannot be
89    immediately satisfied until it can get the lock. This means that
90    the main ctdb daemon will not block waiting for a chainlock held by
91    a client
92
93    There are 3 possible return values:
94
95        0:    means that it got the lock immediately.
96       -1:    means that it failed to get the lock, and won't retry
97       -2:    means that it failed to get the lock immediately, but will retry
98  */
99 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
100                            TDB_DATA key, struct ctdb_req_header *hdr,
101                            void (*recv_pkt)(void *, struct ctdb_req_header *),
102                            void *recv_context, bool ignore_generation)
103 {
104         int ret;
105         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
106         struct lockwait_handle *h;
107         struct lock_fetch_state *state;
108         
109         ret = tdb_chainlock_nonblock(tdb, key);
110
111         if (ret != 0 &&
112             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
113                 /* a hard failure - don't try again */
114                 return -1;
115         }
116
117         /* when torturing, ensure we test the contended path */
118         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
119             random() % 5 == 0) {
120                 ret = -1;
121                 tdb_chainunlock(tdb, key);
122         }
123
124         /* first the non-contended path */
125         if (ret == 0) {
126                 return 0;
127         }
128
129         state = talloc(hdr, struct lock_fetch_state);
130         state->ctdb = ctdb_db->ctdb;
131         state->hdr = hdr;
132         state->recv_pkt = recv_pkt;
133         state->recv_context = recv_context;
134         state->generation = ctdb_db->ctdb->vnn_map->generation;
135         state->ignore_generation = ignore_generation;
136
137         /* now the contended path */
138         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
139         if (h == NULL) {
140                 tdb_chainunlock(tdb, key);
141                 return -1;
142         }
143
144         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
145            so it won't be freed yet */
146         talloc_steal(state, hdr);
147         talloc_steal(state, h);
148
149         /* now tell the caller than we will retry asynchronously */
150         return -2;
151 }
152
153 /*
154   a varient of ctdb_ltdb_lock_requeue that also fetches the record
155  */
156 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
157                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
158                                  struct ctdb_req_header *hdr, TDB_DATA *data,
159                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
160                                  void *recv_context, bool ignore_generation)
161 {
162         int ret;
163
164         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
165                                      recv_context, ignore_generation);
166         if (ret == 0) {
167                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
168                 if (ret != 0) {
169                         ctdb_ltdb_unlock(ctdb_db, key);
170                 }
171         }
172         return ret;
173 }
174
175
176 /*
177   paraoid check to see if the db is empty
178  */
179 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
180 {
181         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
182         int count = tdb_traverse_read(tdb, NULL, NULL);
183         if (count != 0) {
184                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
185                          ctdb_db->db_path));
186                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
187         }
188 }
189
190
191 /*
192   attach to a database, handling both persistent and non-persistent databases
193   return 0 on success, -1 on failure
194  */
195 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
196 {
197         struct ctdb_db_context *ctdb_db, *tmp_db;
198         int ret;
199         struct TDB_DATA key;
200         unsigned tdb_flags;
201
202         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
203         CTDB_NO_MEMORY(ctdb, ctdb_db);
204
205         ctdb_db->priority = 1;
206         ctdb_db->ctdb = ctdb;
207         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
208         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
209
210         key.dsize = strlen(db_name)+1;
211         key.dptr  = discard_const(db_name);
212         ctdb_db->db_id = ctdb_hash(&key);
213         ctdb_db->persistent = persistent;
214
215         /* check for hash collisions */
216         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
217                 if (tmp_db->db_id == ctdb_db->db_id) {
218                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
219                                  tmp_db->db_id, db_name, tmp_db->db_name));
220                         talloc_free(ctdb_db);
221                         return -1;
222                 }
223         }
224
225         if (ctdb->db_directory == NULL) {
226                 ctdb->db_directory = VARDIR "/ctdb";
227         }
228
229         /* make sure the db directory exists */
230         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
231                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n", 
232                          ctdb->db_directory));
233                 talloc_free(ctdb_db);
234                 return -1;
235         }
236
237         if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
238                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n", 
239                          ctdb->db_directory_persistent));
240                 talloc_free(ctdb_db);
241                 return -1;
242         }
243
244         /* open the database */
245         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
246                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
247                                            db_name, ctdb->pnn);
248
249         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
250         if (!ctdb->do_setsched) {
251                 tdb_flags |= TDB_NOMMAP;
252         }
253
254         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
255                                       ctdb->tunable.database_hash_size, 
256                                       tdb_flags, 
257                                       O_CREAT|O_RDWR, 0666);
258         if (ctdb_db->ltdb == NULL) {
259                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
260                 talloc_free(ctdb_db);
261                 return -1;
262         }
263
264         if (!persistent) {
265                 ctdb_check_db_empty(ctdb_db);
266         }
267
268         DLIST_ADD(ctdb->db_list, ctdb_db);
269
270         /* setting this can help some high churn databases */
271         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
272
273         /* 
274            all databases support the "null" function. we need this in
275            order to do forced migration of records
276         */
277         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
278         if (ret != 0) {
279                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
280                 talloc_free(ctdb_db);
281                 return -1;
282         }
283
284         /* 
285            all databases support the "fetch" function. we need this
286            for efficient Samba3 ctdb fetch
287         */
288         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
289         if (ret != 0) {
290                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
291                 talloc_free(ctdb_db);
292                 return -1;
293         }
294
295         ret = ctdb_vacuum_init(ctdb_db);
296         if (ret != 0) {
297                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for database '%s'\n", ctdb_db->db_name));
298                 talloc_free(ctdb_db);
299                 return -1;
300         }
301
302
303         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
304         
305         /* success */
306         return 0;
307 }
308
309
310 /*
311   a client has asked to attach a new database
312  */
313 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
314                                TDB_DATA *outdata, uint64_t tdb_flags, 
315                                bool persistent)
316 {
317         const char *db_name = (const char *)indata.dptr;
318         struct ctdb_db_context *db;
319         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
320
321         /* the client can optionally pass additional tdb flags, but we
322            only allow a subset of those on the database in ctdb. Note
323            that tdb_flags is passed in via the (otherwise unused)
324            srvid to the attach control */
325         tdb_flags &= TDB_NOSYNC;
326
327         /* If the node is inactive it is not part of the cluster
328            and we should not allow clients to attach to any
329            databases
330         */
331         if (node->flags & NODE_FLAGS_INACTIVE) {
332                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
333                 return -1;
334         }
335
336
337         /* see if we already have this name */
338         db = ctdb_db_handle(ctdb, db_name);
339         if (db) {
340                 outdata->dptr  = (uint8_t *)&db->db_id;
341                 outdata->dsize = sizeof(db->db_id);
342                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
343                 return 0;
344         }
345
346         if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
347                 return -1;
348         }
349
350         db = ctdb_db_handle(ctdb, db_name);
351         if (!db) {
352                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
353                 return -1;
354         }
355
356         /* remember the flags the client has specified */
357         tdb_add_flags(db->ltdb->tdb, tdb_flags);
358
359         outdata->dptr  = (uint8_t *)&db->db_id;
360         outdata->dsize = sizeof(db->db_id);
361
362         /* tell all the other nodes about this database */
363         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
364                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
365                                                 CTDB_CONTROL_DB_ATTACH,
366                                  0, CTDB_CTRL_FLAG_NOREPLY,
367                                  indata, NULL, NULL);
368
369         /* success */
370         return 0;
371 }
372
373
374 /*
375   attach to all existing persistent databases
376  */
377 int ctdb_attach_persistent(struct ctdb_context *ctdb)
378 {
379         DIR *d;
380         struct dirent *de;
381
382         /* open the persistent db directory and scan it for files */
383         d = opendir(ctdb->db_directory_persistent);
384         if (d == NULL) {
385                 return 0;
386         }
387
388         while ((de=readdir(d))) {
389                 char *p, *s;
390                 size_t len = strlen(de->d_name);
391                 uint32_t node;
392                 
393                 s = talloc_strdup(ctdb, de->d_name);
394                 CTDB_NO_MEMORY(ctdb, s);
395
396                 /* ignore names ending in .bak */
397                 p = strstr(s, ".bak");
398                 if (p != NULL) {
399                         continue;
400                 }
401
402                 /* only accept names ending in .tdb */
403                 p = strstr(s, ".tdb.");
404                 if (len < 7 || p == NULL) {
405                         talloc_free(s);
406                         continue;
407                 }
408                 if (sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
409                         talloc_free(s);
410                         continue;
411                 }
412                 p[4] = 0;
413
414                 if (ctdb_local_attach(ctdb, s, true) != 0) {
415                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
416                         closedir(d);
417                         talloc_free(s);
418                         return -1;
419                 }
420                 DEBUG(DEBUG_NOTICE,("Attached to persistent database %s\n", s));
421
422                 talloc_free(s);
423         }
424         closedir(d);
425         return 0;
426 }
427
428 /*
429   called when a broadcast seqnum update comes in
430  */
431 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
432 {
433         struct ctdb_db_context *ctdb_db;
434         if (srcnode == ctdb->pnn) {
435                 /* don't update ourselves! */
436                 return 0;
437         }
438
439         ctdb_db = find_ctdb_db(ctdb, db_id);
440         if (!ctdb_db) {
441                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
442                 return -1;
443         }
444
445         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
446         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
447         return 0;
448 }
449
450 /*
451   timer to check for seqnum changes in a ltdb and propogate them
452  */
453 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
454                                    struct timeval t, void *p)
455 {
456         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
457         struct ctdb_context *ctdb = ctdb_db->ctdb;
458         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
459         if (new_seqnum != ctdb_db->seqnum) {
460                 /* something has changed - propogate it */
461                 TDB_DATA data;
462                 data.dptr = (uint8_t *)&ctdb_db->db_id;
463                 data.dsize = sizeof(uint32_t);
464                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
465                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
466                                          data, NULL, NULL);             
467         }
468         ctdb_db->seqnum = new_seqnum;
469
470         /* setup a new timer */
471         ctdb_db->te = 
472                 event_add_timed(ctdb->ev, ctdb_db, 
473                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
474                                 ctdb_ltdb_seqnum_check, ctdb_db);
475 }
476
477 /*
478   enable seqnum handling on this db
479  */
480 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
481 {
482         struct ctdb_db_context *ctdb_db;
483         ctdb_db = find_ctdb_db(ctdb, db_id);
484         if (!ctdb_db) {
485                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
486                 return -1;
487         }
488
489         if (ctdb_db->te == NULL) {
490                 ctdb_db->te = 
491                         event_add_timed(ctdb->ev, ctdb_db, 
492                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
493                                         ctdb_ltdb_seqnum_check, ctdb_db);
494         }
495
496         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
497         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
498         return 0;
499 }
500
501 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
502 {
503         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
504         struct ctdb_db_context *ctdb_db;
505
506         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
507         if (!ctdb_db) {
508                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
509                 return -1;
510         }
511
512         ctdb_db->priority = db_prio->priority;
513         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
514
515         return 0;
516 }
517
518