server: open /var/ctdb/state/persistent_health.tdb.X on startup
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include <ctype.h>
30
31 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
32
33 /*
34   this is the dummy null procedure that all databases support
35 */
36 static int ctdb_null_func(struct ctdb_call_info *call)
37 {
38         return 0;
39 }
40
41 /*
42   this is a plain fetch procedure that all databases support
43 */
44 static int ctdb_fetch_func(struct ctdb_call_info *call)
45 {
46         call->reply_data = &call->record_data;
47         return 0;
48 }
49
50
51
52 struct lock_fetch_state {
53         struct ctdb_context *ctdb;
54         void (*recv_pkt)(void *, struct ctdb_req_header *);
55         void *recv_context;
56         struct ctdb_req_header *hdr;
57         uint32_t generation;
58         bool ignore_generation;
59 };
60
61 /*
62   called when we should retry the operation
63  */
64 static void lock_fetch_callback(void *p)
65 {
66         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
67         if (!state->ignore_generation &&
68             state->generation != state->ctdb->vnn_map->generation) {
69                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
70                 talloc_free(state->hdr);
71                 return;
72         }
73         state->recv_pkt(state->recv_context, state->hdr);
74         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
75 }
76
77
78 /*
79   do a non-blocking ltdb_lock, deferring this ctdb request until we
80   have the chainlock
81
82   It does the following:
83
84    1) tries to get the chainlock. If it succeeds, then it returns 0
85
86    2) if it fails to get a chainlock immediately then it sets up a
87    non-blocking chainlock via ctdb_lockwait, and when it gets the
88    chainlock it re-submits this ctdb request to the main packet
89    receive function
90
91    This effectively queues all ctdb requests that cannot be
92    immediately satisfied until it can get the lock. This means that
93    the main ctdb daemon will not block waiting for a chainlock held by
94    a client
95
96    There are 3 possible return values:
97
98        0:    means that it got the lock immediately.
99       -1:    means that it failed to get the lock, and won't retry
100       -2:    means that it failed to get the lock immediately, but will retry
101  */
102 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
103                            TDB_DATA key, struct ctdb_req_header *hdr,
104                            void (*recv_pkt)(void *, struct ctdb_req_header *),
105                            void *recv_context, bool ignore_generation)
106 {
107         int ret;
108         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
109         struct lockwait_handle *h;
110         struct lock_fetch_state *state;
111         
112         ret = tdb_chainlock_nonblock(tdb, key);
113
114         if (ret != 0 &&
115             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
116                 /* a hard failure - don't try again */
117                 return -1;
118         }
119
120         /* when torturing, ensure we test the contended path */
121         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
122             random() % 5 == 0) {
123                 ret = -1;
124                 tdb_chainunlock(tdb, key);
125         }
126
127         /* first the non-contended path */
128         if (ret == 0) {
129                 return 0;
130         }
131
132         state = talloc(hdr, struct lock_fetch_state);
133         state->ctdb = ctdb_db->ctdb;
134         state->hdr = hdr;
135         state->recv_pkt = recv_pkt;
136         state->recv_context = recv_context;
137         state->generation = ctdb_db->ctdb->vnn_map->generation;
138         state->ignore_generation = ignore_generation;
139
140         /* now the contended path */
141         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
142         if (h == NULL) {
143                 tdb_chainunlock(tdb, key);
144                 return -1;
145         }
146
147         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148            so it won't be freed yet */
149         talloc_steal(state, hdr);
150         talloc_steal(state, h);
151
152         /* now tell the caller than we will retry asynchronously */
153         return -2;
154 }
155
156 /*
157   a varient of ctdb_ltdb_lock_requeue that also fetches the record
158  */
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
160                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
161                                  struct ctdb_req_header *hdr, TDB_DATA *data,
162                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
163                                  void *recv_context, bool ignore_generation)
164 {
165         int ret;
166
167         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
168                                      recv_context, ignore_generation);
169         if (ret == 0) {
170                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
171                 if (ret != 0) {
172                         ctdb_ltdb_unlock(ctdb_db, key);
173                 }
174         }
175         return ret;
176 }
177
178
179 /*
180   paraoid check to see if the db is empty
181  */
182 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
183 {
184         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
185         int count = tdb_traverse_read(tdb, NULL, NULL);
186         if (count != 0) {
187                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
188                          ctdb_db->db_path));
189                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
190         }
191 }
192
193
194 /*
195   attach to a database, handling both persistent and non-persistent databases
196   return 0 on success, -1 on failure
197  */
198 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
199                              bool persistent, const char *unhealthy_reason)
200 {
201         struct ctdb_db_context *ctdb_db, *tmp_db;
202         int ret;
203         struct TDB_DATA key;
204         unsigned tdb_flags;
205
206         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
207         CTDB_NO_MEMORY(ctdb, ctdb_db);
208
209         ctdb_db->priority = 1;
210         ctdb_db->ctdb = ctdb;
211         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
212         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
213
214         key.dsize = strlen(db_name)+1;
215         key.dptr  = discard_const(db_name);
216         ctdb_db->db_id = ctdb_hash(&key);
217         ctdb_db->persistent = persistent;
218
219         /* check for hash collisions */
220         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
221                 if (tmp_db->db_id == ctdb_db->db_id) {
222                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
223                                  tmp_db->db_id, db_name, tmp_db->db_name));
224                         talloc_free(ctdb_db);
225                         return -1;
226                 }
227         }
228
229         /* open the database */
230         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
231                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
232                                            db_name, ctdb->pnn);
233
234         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
235         if (!ctdb->do_setsched) {
236                 tdb_flags |= TDB_NOMMAP;
237         }
238         tdb_flags |= TDB_DISALLOW_NESTING;
239
240         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
241                                       ctdb->tunable.database_hash_size, 
242                                       tdb_flags, 
243                                       O_CREAT|O_RDWR, 0600);
244         if (ctdb_db->ltdb == NULL) {
245                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
246                 talloc_free(ctdb_db);
247                 return -1;
248         }
249
250         if (!persistent) {
251                 ctdb_check_db_empty(ctdb_db);
252         }
253
254         DLIST_ADD(ctdb->db_list, ctdb_db);
255
256         /* setting this can help some high churn databases */
257         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
258
259         /* 
260            all databases support the "null" function. we need this in
261            order to do forced migration of records
262         */
263         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
264         if (ret != 0) {
265                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
266                 talloc_free(ctdb_db);
267                 return -1;
268         }
269
270         /* 
271            all databases support the "fetch" function. we need this
272            for efficient Samba3 ctdb fetch
273         */
274         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
275         if (ret != 0) {
276                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
277                 talloc_free(ctdb_db);
278                 return -1;
279         }
280
281         ret = ctdb_vacuum_init(ctdb_db);
282         if (ret != 0) {
283                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
284                                   "database '%s'\n", ctdb_db->db_name));
285                 talloc_free(ctdb_db);
286                 return -1;
287         }
288
289
290         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
291         
292         /* success */
293         return 0;
294 }
295
296
297 /*
298   a client has asked to attach a new database
299  */
300 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
301                                TDB_DATA *outdata, uint64_t tdb_flags, 
302                                bool persistent)
303 {
304         const char *db_name = (const char *)indata.dptr;
305         struct ctdb_db_context *db;
306         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
307
308         /* the client can optionally pass additional tdb flags, but we
309            only allow a subset of those on the database in ctdb. Note
310            that tdb_flags is passed in via the (otherwise unused)
311            srvid to the attach control */
312         tdb_flags &= TDB_NOSYNC;
313
314         /* If the node is inactive it is not part of the cluster
315            and we should not allow clients to attach to any
316            databases
317         */
318         if (node->flags & NODE_FLAGS_INACTIVE) {
319                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
320                 return -1;
321         }
322
323
324         /* see if we already have this name */
325         db = ctdb_db_handle(ctdb, db_name);
326         if (db) {
327                 outdata->dptr  = (uint8_t *)&db->db_id;
328                 outdata->dsize = sizeof(db->db_id);
329                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
330                 return 0;
331         }
332
333         if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
334                 return -1;
335         }
336
337         db = ctdb_db_handle(ctdb, db_name);
338         if (!db) {
339                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
340                 return -1;
341         }
342
343         /* remember the flags the client has specified */
344         tdb_add_flags(db->ltdb->tdb, tdb_flags);
345
346         outdata->dptr  = (uint8_t *)&db->db_id;
347         outdata->dsize = sizeof(db->db_id);
348
349         /* tell all the other nodes about this database */
350         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
351                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
352                                                 CTDB_CONTROL_DB_ATTACH,
353                                  0, CTDB_CTRL_FLAG_NOREPLY,
354                                  indata, NULL, NULL);
355
356         /* success */
357         return 0;
358 }
359
360
361 /*
362   attach to all existing persistent databases
363  */
364 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
365                                   const char *unhealthy_reason)
366 {
367         DIR *d;
368         struct dirent *de;
369
370         /* open the persistent db directory and scan it for files */
371         d = opendir(ctdb->db_directory_persistent);
372         if (d == NULL) {
373                 return 0;
374         }
375
376         while ((de=readdir(d))) {
377                 char *p, *s, *q;
378                 size_t len = strlen(de->d_name);
379                 uint32_t node;
380                 int invalid_name = 0;
381                 
382                 s = talloc_strdup(ctdb, de->d_name);
383                 CTDB_NO_MEMORY(ctdb, s);
384
385                 /* only accept names ending in .tdb */
386                 p = strstr(s, ".tdb.");
387                 if (len < 7 || p == NULL) {
388                         talloc_free(s);
389                         continue;
390                 }
391
392                 /* only accept names ending with .tdb. and any number of digits */
393                 q = p+5;
394                 while (*q != 0 && invalid_name == 0) {
395                         if (!isdigit(*q++)) {
396                                 invalid_name = 1;
397                         }
398                 }
399                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
400                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
401                         talloc_free(s);
402                         continue;
403                 }
404                 p[4] = 0;
405
406                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
407                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
408                         closedir(d);
409                         talloc_free(s);
410                         return -1;
411                 }
412
413                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
414
415                 talloc_free(s);
416         }
417         closedir(d);
418         return 0;
419 }
420
421 int ctdb_attach_databases(struct ctdb_context *ctdb)
422 {
423         int ret;
424         char *persistent_health_path = NULL;
425         char *unhealthy_reason = NULL;
426         bool first_try = true;
427
428         if (ctdb->db_directory == NULL) {
429                 ctdb->db_directory = VARDIR "/ctdb";
430         }
431         if (ctdb->db_directory_persistent == NULL) {
432                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
433         }
434         if (ctdb->db_directory_state == NULL) {
435                 ctdb->db_directory_state = VARDIR "/ctdb/state";
436         }
437
438         /* make sure the db directory exists */
439         ret = mkdir(ctdb->db_directory, 0700);
440         if (ret == -1 && errno != EEXIST) {
441                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
442                          ctdb->db_directory));
443                 return -1;
444         }
445
446         /* make sure the persistent db directory exists */
447         ret = mkdir(ctdb->db_directory_persistent, 0700);
448         if (ret == -1 && errno != EEXIST) {
449                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
450                          ctdb->db_directory_persistent));
451                 return -1;
452         }
453
454         /* make sure the internal state db directory exists */
455         ret = mkdir(ctdb->db_directory_state, 0700);
456         if (ret == -1 && errno != EEXIST) {
457                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
458                          ctdb->db_directory_state));
459                 return -1;
460         }
461
462         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
463                                                  ctdb->db_directory_state,
464                                                  PERSISTENT_HEALTH_TDB,
465                                                  ctdb->pnn);
466         if (persistent_health_path == NULL) {
467                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
468                 return -1;
469         }
470
471 again:
472
473         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
474                                                    0, TDB_DISALLOW_NESTING,
475                                                    O_CREAT | O_RDWR, 0600);
476         if (ctdb->db_persistent_health == NULL) {
477                 struct tdb_wrap *tdb;
478
479                 if (!first_try) {
480                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
481                                           persistent_health_path,
482                                           errno,
483                                           strerror(errno)));
484                         talloc_free(persistent_health_path);
485                         talloc_free(unhealthy_reason);
486                         return -1;
487                 }
488                 first_try = false;
489
490                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
491                                                    persistent_health_path,
492                                                    "was cleared after a failure",
493                                                    "manual verification needed");
494                 if (unhealthy_reason == NULL) {
495                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
496                         talloc_free(persistent_health_path);
497                         return -1;
498                 }
499
500                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
501                                   persistent_health_path));
502                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
503                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
504                                     O_CREAT | O_RDWR, 0600);
505                 if (tdb) {
506                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
507                                           persistent_health_path,
508                                           errno,
509                                           strerror(errno)));
510                         talloc_free(persistent_health_path);
511                         talloc_free(unhealthy_reason);
512                         return -1;
513                 }
514
515                 talloc_free(tdb);
516                 goto again;
517         }
518         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
519         if (ret != 0) {
520                 struct tdb_wrap *tdb;
521
522                 talloc_free(ctdb->db_persistent_health);
523                 ctdb->db_persistent_health = NULL;
524
525                 if (!first_try) {
526                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
527                                           persistent_health_path));
528                         talloc_free(persistent_health_path);
529                         talloc_free(unhealthy_reason);
530                         return -1;
531                 }
532                 first_try = false;
533
534                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
535                                                    persistent_health_path,
536                                                    "was cleared after a failure",
537                                                    "manual verification needed");
538                 if (unhealthy_reason == NULL) {
539                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
540                         talloc_free(persistent_health_path);
541                         return -1;
542                 }
543
544                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
545                                   persistent_health_path));
546                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
547                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
548                                     O_CREAT | O_RDWR, 0600);
549                 if (tdb) {
550                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
551                                           persistent_health_path,
552                                           errno,
553                                           strerror(errno)));
554                         talloc_free(persistent_health_path);
555                         talloc_free(unhealthy_reason);
556                         return -1;
557                 }
558
559                 talloc_free(tdb);
560                 goto again;
561         }
562         talloc_free(persistent_health_path);
563
564         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
565         talloc_free(unhealthy_reason);
566         if (ret != 0) {
567                 return ret;
568         }
569
570         return 0;
571 }
572
573 /*
574   called when a broadcast seqnum update comes in
575  */
576 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
577 {
578         struct ctdb_db_context *ctdb_db;
579         if (srcnode == ctdb->pnn) {
580                 /* don't update ourselves! */
581                 return 0;
582         }
583
584         ctdb_db = find_ctdb_db(ctdb, db_id);
585         if (!ctdb_db) {
586                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
587                 return -1;
588         }
589
590         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
591         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
592         return 0;
593 }
594
595 /*
596   timer to check for seqnum changes in a ltdb and propogate them
597  */
598 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
599                                    struct timeval t, void *p)
600 {
601         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
602         struct ctdb_context *ctdb = ctdb_db->ctdb;
603         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
604         if (new_seqnum != ctdb_db->seqnum) {
605                 /* something has changed - propogate it */
606                 TDB_DATA data;
607                 data.dptr = (uint8_t *)&ctdb_db->db_id;
608                 data.dsize = sizeof(uint32_t);
609                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
610                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
611                                          data, NULL, NULL);             
612         }
613         ctdb_db->seqnum = new_seqnum;
614
615         /* setup a new timer */
616         ctdb_db->seqnum_update =
617                 event_add_timed(ctdb->ev, ctdb_db, 
618                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
619                                 ctdb_ltdb_seqnum_check, ctdb_db);
620 }
621
622 /*
623   enable seqnum handling on this db
624  */
625 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
626 {
627         struct ctdb_db_context *ctdb_db;
628         ctdb_db = find_ctdb_db(ctdb, db_id);
629         if (!ctdb_db) {
630                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
631                 return -1;
632         }
633
634         if (ctdb_db->seqnum_update == NULL) {
635                 ctdb_db->seqnum_update =
636                         event_add_timed(ctdb->ev, ctdb_db, 
637                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
638                                         ctdb_ltdb_seqnum_check, ctdb_db);
639         }
640
641         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
642         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
643         return 0;
644 }
645
646 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
647 {
648         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
649         struct ctdb_db_context *ctdb_db;
650
651         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
652         if (!ctdb_db) {
653                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
654                 return -1;
655         }
656
657         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
658                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
659                 return -1;
660         }
661
662         ctdb_db->priority = db_prio->priority;
663         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
664
665         return 0;
666 }
667
668