merge from tridge
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
26 #include "db_wrap.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30   this is the dummy null procedure that all databases support
31 */
32 static int ctdb_null_func(struct ctdb_call_info *call)
33 {
34         return 0;
35 }
36
37 /*
38   this is a plain fetch procedure that all databases support
39 */
40 static int ctdb_fetch_func(struct ctdb_call_info *call)
41 {
42         call->reply_data = &call->record_data;
43         return 0;
44 }
45
46
47
48 struct lock_fetch_state {
49         struct ctdb_context *ctdb;
50         void (*recv_pkt)(void *, struct ctdb_req_header *);
51         void *recv_context;
52         struct ctdb_req_header *hdr;
53         uint32_t generation;
54         bool ignore_generation;
55 };
56
57 /*
58   called when we should retry the operation
59  */
60 static void lock_fetch_callback(void *p)
61 {
62         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
63         if (!state->ignore_generation &&
64             state->generation != state->ctdb->vnn_map->generation) {
65                 DEBUG(0,("Discarding previous generation lockwait packet\n"));
66                 talloc_free(state->hdr);
67                 return;
68         }
69         state->recv_pkt(state->recv_context, state->hdr);
70         DEBUG(2,(__location__ " PACKET REQUEUED\n"));
71 }
72
73
74 /*
75   do a non-blocking ltdb_lock, deferring this ctdb request until we
76   have the chainlock
77
78   It does the following:
79
80    1) tries to get the chainlock. If it succeeds, then it returns 0
81
82    2) if it fails to get a chainlock immediately then it sets up a
83    non-blocking chainlock via ctdb_lockwait, and when it gets the
84    chainlock it re-submits this ctdb request to the main packet
85    receive function
86
87    This effectively queues all ctdb requests that cannot be
88    immediately satisfied until it can get the lock. This means that
89    the main ctdb daemon will not block waiting for a chainlock held by
90    a client
91
92    There are 3 possible return values:
93
94        0:    means that it got the lock immediately.
95       -1:    means that it failed to get the lock, and won't retry
96       -2:    means that it failed to get the lock immediately, but will retry
97  */
98 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
99                            TDB_DATA key, struct ctdb_req_header *hdr,
100                            void (*recv_pkt)(void *, struct ctdb_req_header *),
101                            void *recv_context, bool ignore_generation)
102 {
103         int ret;
104         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
105         struct lockwait_handle *h;
106         struct lock_fetch_state *state;
107         
108         ret = tdb_chainlock_nonblock(tdb, key);
109
110         if (ret != 0 &&
111             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
112                 /* a hard failure - don't try again */
113                 return -1;
114         }
115
116         /* when torturing, ensure we test the contended path */
117         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
118             random() % 5 == 0) {
119                 ret = -1;
120                 tdb_chainunlock(tdb, key);
121         }
122
123         /* first the non-contended path */
124         if (ret == 0) {
125                 return 0;
126         }
127
128         state = talloc(hdr, struct lock_fetch_state);
129         state->ctdb = ctdb_db->ctdb;
130         state->hdr = hdr;
131         state->recv_pkt = recv_pkt;
132         state->recv_context = recv_context;
133         state->generation = ctdb_db->ctdb->vnn_map->generation;
134         state->ignore_generation = ignore_generation;
135
136         /* now the contended path */
137         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
138         if (h == NULL) {
139                 tdb_chainunlock(tdb, key);
140                 return -1;
141         }
142
143         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
144            so it won't be freed yet */
145         talloc_steal(state, hdr);
146         talloc_steal(state, h);
147
148         /* now tell the caller than we will retry asynchronously */
149         return -2;
150 }
151
152 /*
153   a varient of ctdb_ltdb_lock_requeue that also fetches the record
154  */
155 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
156                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
157                                  struct ctdb_req_header *hdr, TDB_DATA *data,
158                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
159                                  void *recv_context, bool ignore_generation)
160 {
161         int ret;
162
163         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
164                                      recv_context, ignore_generation);
165         if (ret == 0) {
166                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
167                 if (ret != 0) {
168                         ctdb_ltdb_unlock(ctdb_db, key);
169                 }
170         }
171         return ret;
172 }
173
174
175 /*
176   paraoid check to see if the db is empty
177  */
178 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
179 {
180         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
181         int count = tdb_traverse_read(tdb, NULL, NULL);
182         if (count != 0) {
183                 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
184                          ctdb_db->db_path));
185                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
186         }
187 }
188
189 /*
190   a client has asked to attach a new database
191  */
192 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
193                                TDB_DATA *outdata)
194 {
195         const char *db_name = (const char *)indata.dptr;
196         struct ctdb_db_context *ctdb_db, *tmp_db;
197         int ret;
198
199         /* see if we already have this name */
200         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
201                 if (strcmp(db_name, tmp_db->db_name) == 0) {
202                         /* this is not an error */
203                         outdata->dptr  = (uint8_t *)&tmp_db->db_id;
204                         outdata->dsize = sizeof(tmp_db->db_id);
205                         return 0;
206                 }
207         }
208
209         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
210         CTDB_NO_MEMORY(ctdb, ctdb_db);
211
212         ctdb_db->ctdb = ctdb;
213         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
214         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
215
216         ctdb_db->db_id = ctdb_hash(&indata);
217
218         outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
219         outdata->dsize = sizeof(ctdb_db->db_id);
220
221         /* check for hash collisions */
222         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
223                 if (tmp_db->db_id == ctdb_db->db_id) {
224                         DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
225                                  tmp_db->db_id, db_name, tmp_db->db_name));
226                         talloc_free(ctdb_db);
227                         return -1;
228                 }
229         }
230
231         if (ctdb->db_directory == NULL) {
232                 ctdb->db_directory = VARDIR "/ctdb";
233         }
234
235         /* make sure the db directory exists */
236         if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
237                 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
238                          ctdb->db_directory));
239                 talloc_free(ctdb_db);
240                 return -1;
241         }
242
243         /* open the database */
244         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
245                                            ctdb->db_directory, 
246                                            db_name, ctdb->vnn);
247
248         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
249                                       ctdb->tunable.database_hash_size, 
250                                       TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
251         if (ctdb_db->ltdb == NULL) {
252                 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
253                 talloc_free(ctdb_db);
254                 return -1;
255         }
256
257         ctdb_check_db_empty(ctdb_db);
258
259         DLIST_ADD(ctdb->db_list, ctdb_db);
260
261         /* 
262            all databases support the "null" function. we need this in
263            order to do forced migration of records
264         */
265         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
266         if (ret != 0) {
267                 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
268                 talloc_free(ctdb_db);
269                 return -1;
270         }
271
272         /* 
273            all databases support the "fetch" function. we need this
274            for efficient Samba3 ctdb fetch
275         */
276         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
277         if (ret != 0) {
278                 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
279                 talloc_free(ctdb_db);
280                 return -1;
281         }
282         
283         /* tell all the other nodes about this database */
284         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
285                                  CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
286                                  indata, NULL, NULL);
287
288         DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
289
290         /* success */
291         return 0;
292 }
293
294 /*
295   called when a broadcast seqnum update comes in
296  */
297 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
298 {
299         struct ctdb_db_context *ctdb_db;
300         if (srcnode == ctdb->vnn) {
301                 /* don't update ourselves! */
302                 return 0;
303         }
304
305         ctdb_db = find_ctdb_db(ctdb, db_id);
306         if (!ctdb_db) {
307                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
308                 return -1;
309         }
310
311         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
312         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
313         return 0;
314 }
315
316 /*
317   timer to check for seqnum changes in a ltdb and propogate them
318  */
319 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
320                                    struct timeval t, void *p)
321 {
322         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
323         struct ctdb_context *ctdb = ctdb_db->ctdb;
324         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
325         if (new_seqnum != ctdb_db->seqnum) {
326                 /* something has changed - propogate it */
327                 TDB_DATA data;
328                 data.dptr = (uint8_t *)&ctdb_db->db_id;
329                 data.dsize = sizeof(uint32_t);
330                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
331                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
332                                          data, NULL, NULL);             
333         }
334         ctdb_db->seqnum = new_seqnum;
335
336         /* setup a new timer */
337         ctdb_db->te = 
338                 event_add_timed(ctdb->ev, ctdb_db, 
339                                 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
340                                 ctdb_ltdb_seqnum_check, ctdb_db);
341 }
342
343 /*
344   enable seqnum handling on this db
345  */
346 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
347 {
348         struct ctdb_db_context *ctdb_db;
349         ctdb_db = find_ctdb_db(ctdb, db_id);
350         if (!ctdb_db) {
351                 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
352                 return -1;
353         }
354
355         if (ctdb_db->te == NULL) {
356                 ctdb_db->te = 
357                         event_add_timed(ctdb->ev, ctdb_db, 
358                                         timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
359                                         ctdb_ltdb_seqnum_check, ctdb_db);
360         }
361
362         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
363         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
364         return 0;
365 }
366