ctdb_ltdb_store_server: always store the data when ctdb_ltdb_store() is called from...
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /*
36   this is the dummy null procedure that all databases support
37 */
38 static int ctdb_null_func(struct ctdb_call_info *call)
39 {
40         return 0;
41 }
42
43 /*
44   this is a plain fetch procedure that all databases support
45 */
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 {
48         call->reply_data = &call->record_data;
49         return 0;
50 }
51
52
53 /**
54  * write a record to a normal database
55  *
56  * This is the server-variant of the ctdb_ltdb_store function.
57  * It contains logic to determine whether a record should be
58  * stored or deleted.
59  */
60 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
61                                   TDB_DATA key,
62                                   struct ctdb_ltdb_header *header,
63                                   TDB_DATA data)
64 {
65         struct ctdb_context *ctdb = ctdb_db->ctdb;
66         TDB_DATA rec;
67         int ret;
68         bool seqnum_suppressed = false;
69         bool keep = false;
70         uint32_t lmaster;
71
72         if (ctdb->flags & CTDB_FLAG_TORTURE) {
73                 struct ctdb_ltdb_header *h2;
74                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
75                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
76                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
77                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
78                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
79                 }
80                 if (rec.dptr) free(rec.dptr);
81         }
82
83         if (ctdb->vnn_map == NULL) {
84                 /*
85                  * Called from a client: always store the record
86                  * Also don't call ctdb_lmaster since it uses the vnn_map!
87                  */
88                 keep = true;
89                 goto store;
90         }
91
92         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
93
94         /*
95          * If we migrate an empty record off to another node
96          * and the record has not been migrated with data,
97          * delete the record instead of storing the empty record.
98          */
99         if (data.dsize != 0) {
100                 keep = true;
101         } else if (ctdb_db->persistent) {
102                 keep = true;
103         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
104                 keep = true;
105         } else if (ctdb_db->ctdb->pnn == lmaster) {
106                 /*
107                  * If we are lmaster, then we usually keep the record.
108                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
109                  * and the record is empty and has never been migrated
110                  * with data, then we should delete it instead of storing it.
111                  * This is part of the vacuuming process.
112                  *
113                  * The reason that we usually need to store even empty records
114                  * on the lmaster is that a client operating directly on the
115                  * lmaster (== dmaster) expects the local copy of the record to
116                  * exist after successful ctdb migrate call. If the record does
117                  * not exist, the client goes into a migrate loop and eventually
118                  * fails. So storing the empty record makes sure that we do not
119                  * need to change the client code.
120                  */
121                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
122                         keep = true;
123                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
124                         keep = true;
125                 }
126         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
127                 keep = true;
128         }
129
130 store:
131         /*
132          * The VACUUM_MIGRATED flag is only set temporarily for
133          * the above logic when the record was retrieved by a
134          * VACUUM_MIGRATE call and should not be stored in the
135          * database.
136          *
137          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
138          * and there are two cases in which the corresponding record
139          * is stored in the local database:
140          * 1. The record has been migrated with data in the past
141          *    (the MIGRATED_WITH_DATA record flag is set).
142          * 2. The record has been filled with data again since it
143          *    had been submitted in the VACUUM_FETCH message to the
144          *    lmaster.
145          * For such records it is important to not store the
146          * VACUUM_MIGRATED flag in the database.
147          */
148         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
149
150         rec.dsize = sizeof(*header) + data.dsize;
151         rec.dptr = talloc_size(ctdb, rec.dsize);
152         CTDB_NO_MEMORY(ctdb, rec.dptr);
153
154         memcpy(rec.dptr, header, sizeof(*header));
155         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
156
157         /* Databases with seqnum updates enabled only get their seqnum
158            changes when/if we modify the data */
159         if (ctdb_db->seqnum_update != NULL) {
160                 TDB_DATA old;
161                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
162
163                 if ( (old.dsize == rec.dsize)
164                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
165                           rec.dptr+sizeof(struct ctdb_ltdb_header),
166                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
167                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
168                         seqnum_suppressed = true;
169                 }
170                 if (old.dptr) free(old.dptr);
171         }
172
173         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
174                             ctdb_db->db_name,
175                             keep?"storing":"deleting",
176                             ctdb_hash(&key)));
177
178         if (keep) {
179                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
180         } else {
181                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
182         }
183
184         if (ret != 0) {
185                 DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
186         }
187         if (seqnum_suppressed) {
188                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
189         }
190
191         talloc_free(rec.dptr);
192
193         return ret;
194 }
195
196 struct lock_fetch_state {
197         struct ctdb_context *ctdb;
198         void (*recv_pkt)(void *, struct ctdb_req_header *);
199         void *recv_context;
200         struct ctdb_req_header *hdr;
201         uint32_t generation;
202         bool ignore_generation;
203 };
204
205 /*
206   called when we should retry the operation
207  */
208 static void lock_fetch_callback(void *p)
209 {
210         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
211         if (!state->ignore_generation &&
212             state->generation != state->ctdb->vnn_map->generation) {
213                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
214                 talloc_free(state->hdr);
215                 return;
216         }
217         state->recv_pkt(state->recv_context, state->hdr);
218         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
219 }
220
221
222 /*
223   do a non-blocking ltdb_lock, deferring this ctdb request until we
224   have the chainlock
225
226   It does the following:
227
228    1) tries to get the chainlock. If it succeeds, then it returns 0
229
230    2) if it fails to get a chainlock immediately then it sets up a
231    non-blocking chainlock via ctdb_lockwait, and when it gets the
232    chainlock it re-submits this ctdb request to the main packet
233    receive function
234
235    This effectively queues all ctdb requests that cannot be
236    immediately satisfied until it can get the lock. This means that
237    the main ctdb daemon will not block waiting for a chainlock held by
238    a client
239
240    There are 3 possible return values:
241
242        0:    means that it got the lock immediately.
243       -1:    means that it failed to get the lock, and won't retry
244       -2:    means that it failed to get the lock immediately, but will retry
245  */
246 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
247                            TDB_DATA key, struct ctdb_req_header *hdr,
248                            void (*recv_pkt)(void *, struct ctdb_req_header *),
249                            void *recv_context, bool ignore_generation)
250 {
251         int ret;
252         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
253         struct lockwait_handle *h;
254         struct lock_fetch_state *state;
255         
256         ret = tdb_chainlock_nonblock(tdb, key);
257
258         if (ret != 0 &&
259             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
260                 /* a hard failure - don't try again */
261                 return -1;
262         }
263
264         /* when torturing, ensure we test the contended path */
265         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
266             random() % 5 == 0) {
267                 ret = -1;
268                 tdb_chainunlock(tdb, key);
269         }
270
271         /* first the non-contended path */
272         if (ret == 0) {
273                 return 0;
274         }
275
276         state = talloc(hdr, struct lock_fetch_state);
277         state->ctdb = ctdb_db->ctdb;
278         state->hdr = hdr;
279         state->recv_pkt = recv_pkt;
280         state->recv_context = recv_context;
281         state->generation = ctdb_db->ctdb->vnn_map->generation;
282         state->ignore_generation = ignore_generation;
283
284         /* now the contended path */
285         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
286         if (h == NULL) {
287                 return -1;
288         }
289
290         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
291            so it won't be freed yet */
292         talloc_steal(state, hdr);
293         talloc_steal(state, h);
294
295         /* now tell the caller than we will retry asynchronously */
296         return -2;
297 }
298
299 /*
300   a varient of ctdb_ltdb_lock_requeue that also fetches the record
301  */
302 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
303                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
304                                  struct ctdb_req_header *hdr, TDB_DATA *data,
305                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
306                                  void *recv_context, bool ignore_generation)
307 {
308         int ret;
309
310         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
311                                      recv_context, ignore_generation);
312         if (ret == 0) {
313                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
314                 if (ret != 0) {
315                         int uret;
316                         uret = ctdb_ltdb_unlock(ctdb_db, key);
317                         if (uret != 0) {
318                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
319                         }
320                 }
321         }
322         return ret;
323 }
324
325
326 /*
327   paraoid check to see if the db is empty
328  */
329 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
330 {
331         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
332         int count = tdb_traverse_read(tdb, NULL, NULL);
333         if (count != 0) {
334                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
335                          ctdb_db->db_path));
336                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
337         }
338 }
339
340 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
341                                 struct ctdb_db_context *ctdb_db)
342 {
343         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
344         char *old;
345         char *reason = NULL;
346         TDB_DATA key;
347         TDB_DATA val;
348
349         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
350         key.dsize = strlen(ctdb_db->db_name);
351
352         old = ctdb_db->unhealthy_reason;
353         ctdb_db->unhealthy_reason = NULL;
354
355         val = tdb_fetch(tdb, key);
356         if (val.dsize > 0) {
357                 reason = talloc_strndup(ctdb_db,
358                                         (const char *)val.dptr,
359                                         val.dsize);
360                 if (reason == NULL) {
361                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
362                                            (int)val.dsize));
363                         ctdb_db->unhealthy_reason = old;
364                         free(val.dptr);
365                         return -1;
366                 }
367         }
368
369         if (val.dptr) {
370                 free(val.dptr);
371         }
372
373         talloc_free(old);
374         ctdb_db->unhealthy_reason = reason;
375         return 0;
376 }
377
378 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
379                                   struct ctdb_db_context *ctdb_db,
380                                   const char *given_reason,/* NULL means healthy */
381                                   int num_healthy_nodes)
382 {
383         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
384         int ret;
385         TDB_DATA key;
386         TDB_DATA val;
387         char *new_reason = NULL;
388         char *old_reason = NULL;
389
390         ret = tdb_transaction_start(tdb);
391         if (ret != 0) {
392                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
393                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
394                 return -1;
395         }
396
397         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
398         if (ret != 0) {
399                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
400                                    ctdb_db->db_name, ret));
401                 return -1;
402         }
403         old_reason = ctdb_db->unhealthy_reason;
404
405         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
406         key.dsize = strlen(ctdb_db->db_name);
407
408         if (given_reason) {
409                 new_reason = talloc_strdup(ctdb_db, given_reason);
410                 if (new_reason == NULL) {
411                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
412                                           given_reason));
413                         return -1;
414                 }
415         } else if (old_reason && num_healthy_nodes == 0) {
416                 /*
417                  * If the reason indicates ok, but there where no healthy nodes
418                  * available, that it means, we have not recovered valid content
419                  * of the db. So if there's an old reason, prefix it with
420                  * "NO-HEALTHY-NODES - "
421                  */
422                 const char *prefix;
423
424 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
425                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
426                 if (ret != 0) {
427                         prefix = _TMP_PREFIX;
428                 } else {
429                         prefix = "";
430                 }
431                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
432                                          prefix, old_reason);
433                 if (new_reason == NULL) {
434                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
435                                           prefix, old_reason));
436                         return -1;
437                 }
438 #undef _TMP_PREFIX
439         }
440
441         if (new_reason) {
442                 val.dptr = discard_const_p(uint8_t, new_reason);
443                 val.dsize = strlen(new_reason);
444
445                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
446                 if (ret != 0) {
447                         tdb_transaction_cancel(tdb);
448                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
449                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
450                                            ret, tdb_errorstr(tdb)));
451                         talloc_free(new_reason);
452                         return -1;
453                 }
454                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
455                                    ctdb_db->db_name, new_reason));
456         } else if (old_reason) {
457                 ret = tdb_delete(tdb, key);
458                 if (ret != 0) {
459                         tdb_transaction_cancel(tdb);
460                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
461                                            tdb_name(tdb), ctdb_db->db_name,
462                                            ret, tdb_errorstr(tdb)));
463                         talloc_free(new_reason);
464                         return -1;
465                 }
466                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
467                                    ctdb_db->db_name));
468         }
469
470         ret = tdb_transaction_commit(tdb);
471         if (ret != TDB_SUCCESS) {
472                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
473                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
474                 talloc_free(new_reason);
475                 return -1;
476         }
477
478         talloc_free(old_reason);
479         ctdb_db->unhealthy_reason = new_reason;
480
481         return 0;
482 }
483
484 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
485                                      struct ctdb_db_context *ctdb_db)
486 {
487         time_t now = time(NULL);
488         char *new_path;
489         char *new_reason;
490         int ret;
491         struct tm *tm;
492
493         tm = gmtime(&now);
494
495         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
496         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
497                                    "%04u%02u%02u%02u%02u%02u.0Z",
498                                    ctdb_db->db_path,
499                                    tm->tm_year+1900, tm->tm_mon+1,
500                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
501                                    tm->tm_sec);
502         if (new_path == NULL) {
503                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
504                 return -1;
505         }
506
507         new_reason = talloc_asprintf(ctdb_db,
508                                      "ERROR - Backup of corrupted TDB in '%s'",
509                                      new_path);
510         if (new_reason == NULL) {
511                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
512                 return -1;
513         }
514         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
515         talloc_free(new_reason);
516         if (ret != 0) {
517                 DEBUG(DEBUG_CRIT,(__location__
518                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
519                                  ctdb_db->db_path));
520                 return -1;
521         }
522
523         ret = rename(ctdb_db->db_path, new_path);
524         if (ret != 0) {
525                 DEBUG(DEBUG_CRIT,(__location__
526                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
527                                   ctdb_db->db_path, new_path,
528                                   errno, strerror(errno)));
529                 talloc_free(new_path);
530                 return -1;
531         }
532
533         DEBUG(DEBUG_CRIT,(__location__
534                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
535                          ctdb_db->db_path, new_path));
536         talloc_free(new_path);
537         return 0;
538 }
539
540 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
541 {
542         struct ctdb_db_context *ctdb_db;
543         int ret;
544         int ok = 0;
545         int fail = 0;
546
547         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
548                 if (!ctdb_db->persistent) {
549                         continue;
550                 }
551
552                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
553                 if (ret != 0) {
554                         DEBUG(DEBUG_ALERT,(__location__
555                                            " load persistent health for '%s' failed\n",
556                                            ctdb_db->db_path));
557                         return -1;
558                 }
559
560                 if (ctdb_db->unhealthy_reason == NULL) {
561                         ok++;
562                         DEBUG(DEBUG_INFO,(__location__
563                                    " persistent db '%s' healthy\n",
564                                    ctdb_db->db_path));
565                         continue;
566                 }
567
568                 fail++;
569                 DEBUG(DEBUG_ALERT,(__location__
570                                    " persistent db '%s' unhealthy: %s\n",
571                                    ctdb_db->db_path,
572                                    ctdb_db->unhealthy_reason));
573         }
574         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
575               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
576                ok, fail));
577
578         if (fail != 0) {
579                 return -1;
580         }
581
582         return 0;
583 }
584
585
586 /*
587   mark a database - as healthy
588  */
589 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
590 {
591         uint32_t db_id = *(uint32_t *)indata.dptr;
592         struct ctdb_db_context *ctdb_db;
593         int ret;
594         bool may_recover = false;
595
596         ctdb_db = find_ctdb_db(ctdb, db_id);
597         if (!ctdb_db) {
598                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
599                 return -1;
600         }
601
602         if (ctdb_db->unhealthy_reason) {
603                 may_recover = true;
604         }
605
606         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
607         if (ret != 0) {
608                 DEBUG(DEBUG_ERR,(__location__
609                                  " ctdb_update_persistent_health(%s) failed\n",
610                                  ctdb_db->db_name));
611                 return -1;
612         }
613
614         if (may_recover && !ctdb->done_startup) {
615                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
616                                   ctdb_db->db_name));
617                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
618         }
619
620         return 0;
621 }
622
623 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
624                                    TDB_DATA indata,
625                                    TDB_DATA *outdata)
626 {
627         uint32_t db_id = *(uint32_t *)indata.dptr;
628         struct ctdb_db_context *ctdb_db;
629         int ret;
630
631         ctdb_db = find_ctdb_db(ctdb, db_id);
632         if (!ctdb_db) {
633                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
634                 return -1;
635         }
636
637         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
638         if (ret != 0) {
639                 DEBUG(DEBUG_ERR,(__location__
640                                  " ctdb_load_persistent_health(%s) failed\n",
641                                  ctdb_db->db_name));
642                 return -1;
643         }
644
645         *outdata = tdb_null;
646         if (ctdb_db->unhealthy_reason) {
647                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
648                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
649         }
650
651         return 0;
652 }
653
654 /*
655   attach to a database, handling both persistent and non-persistent databases
656   return 0 on success, -1 on failure
657  */
658 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
659                              bool persistent, const char *unhealthy_reason,
660                              bool jenkinshash)
661 {
662         struct ctdb_db_context *ctdb_db, *tmp_db;
663         int ret;
664         struct TDB_DATA key;
665         unsigned tdb_flags;
666         int mode = 0600;
667         int remaining_tries = 0;
668
669         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
670         CTDB_NO_MEMORY(ctdb, ctdb_db);
671
672         ctdb_db->priority = 1;
673         ctdb_db->ctdb = ctdb;
674         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
675         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
676
677         key.dsize = strlen(db_name)+1;
678         key.dptr  = discard_const(db_name);
679         ctdb_db->db_id = ctdb_hash(&key);
680         ctdb_db->persistent = persistent;
681
682         if (!ctdb_db->persistent) {
683                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
684                 if (ctdb_db->delete_queue == NULL) {
685                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
686                 }
687
688                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
689         }
690
691         /* check for hash collisions */
692         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
693                 if (tmp_db->db_id == ctdb_db->db_id) {
694                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
695                                  tmp_db->db_id, db_name, tmp_db->db_name));
696                         talloc_free(ctdb_db);
697                         return -1;
698                 }
699         }
700
701         if (persistent) {
702                 if (unhealthy_reason) {
703                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
704                                                             unhealthy_reason, 0);
705                         if (ret != 0) {
706                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
707                                                    ctdb_db->db_name, unhealthy_reason, ret));
708                                 talloc_free(ctdb_db);
709                                 return -1;
710                         }
711                 }
712
713                 if (ctdb->max_persistent_check_errors > 0) {
714                         remaining_tries = 1;
715                 }
716                 if (ctdb->done_startup) {
717                         remaining_tries = 0;
718                 }
719
720                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
721                 if (ret != 0) {
722                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
723                                    ctdb_db->db_name, ret));
724                         talloc_free(ctdb_db);
725                         return -1;
726                 }
727         }
728
729         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
730                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
731                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
732                 talloc_free(ctdb_db);
733                 return -1;
734         }
735
736         if (ctdb_db->unhealthy_reason) {
737                 /* this is just a warning, but we want that in the log file! */
738                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
739                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
740         }
741
742         /* open the database */
743         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
744                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
745                                            db_name, ctdb->pnn);
746
747         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
748         if (ctdb->valgrinding) {
749                 tdb_flags |= TDB_NOMMAP;
750         }
751         tdb_flags |= TDB_DISALLOW_NESTING;
752         if (jenkinshash) {
753                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
754         }
755
756 again:
757         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
758                                       ctdb->tunable.database_hash_size, 
759                                       tdb_flags, 
760                                       O_CREAT|O_RDWR, mode);
761         if (ctdb_db->ltdb == NULL) {
762                 struct stat st;
763                 int saved_errno = errno;
764
765                 if (!persistent) {
766                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
767                                           ctdb_db->db_path,
768                                           saved_errno,
769                                           strerror(saved_errno)));
770                         talloc_free(ctdb_db);
771                         return -1;
772                 }
773
774                 if (remaining_tries == 0) {
775                         DEBUG(DEBUG_CRIT,(__location__
776                                           "Failed to open persistent tdb '%s': %d - %s\n",
777                                           ctdb_db->db_path,
778                                           saved_errno,
779                                           strerror(saved_errno)));
780                         talloc_free(ctdb_db);
781                         return -1;
782                 }
783
784                 ret = stat(ctdb_db->db_path, &st);
785                 if (ret != 0) {
786                         DEBUG(DEBUG_CRIT,(__location__
787                                           "Failed to open persistent tdb '%s': %d - %s\n",
788                                           ctdb_db->db_path,
789                                           saved_errno,
790                                           strerror(saved_errno)));
791                         talloc_free(ctdb_db);
792                         return -1;
793                 }
794
795                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
796                 if (ret != 0) {
797                         DEBUG(DEBUG_CRIT,(__location__
798                                           "Failed to open persistent tdb '%s': %d - %s\n",
799                                           ctdb_db->db_path,
800                                           saved_errno,
801                                           strerror(saved_errno)));
802                         talloc_free(ctdb_db);
803                         return -1;
804                 }
805
806                 remaining_tries--;
807                 mode = st.st_mode;
808                 goto again;
809         }
810
811         if (!persistent) {
812                 ctdb_check_db_empty(ctdb_db);
813         } else {
814                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
815                 if (ret != 0) {
816                         int fd;
817                         struct stat st;
818
819                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
820                                           ctdb_db->db_path, ret,
821                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
822                         if (remaining_tries == 0) {
823                                 talloc_free(ctdb_db);
824                                 return -1;
825                         }
826
827                         fd = tdb_fd(ctdb_db->ltdb->tdb);
828                         ret = fstat(fd, &st);
829                         if (ret != 0) {
830                                 DEBUG(DEBUG_CRIT,(__location__
831                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
832                                                   ctdb_db->db_path,
833                                                   errno,
834                                                   strerror(errno)));
835                                 talloc_free(ctdb_db);
836                                 return -1;
837                         }
838
839                         /* close the TDB */
840                         talloc_free(ctdb_db->ltdb);
841                         ctdb_db->ltdb = NULL;
842
843                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
844                         if (ret != 0) {
845                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
846                                                   ctdb_db->db_path));
847                                 talloc_free(ctdb_db);
848                                 return -1;
849                         }
850
851                         remaining_tries--;
852                         mode = st.st_mode;
853                         goto again;
854                 }
855         }
856
857         DLIST_ADD(ctdb->db_list, ctdb_db);
858
859         /* setting this can help some high churn databases */
860         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
861
862         /* 
863            all databases support the "null" function. we need this in
864            order to do forced migration of records
865         */
866         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
867         if (ret != 0) {
868                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
869                 talloc_free(ctdb_db);
870                 return -1;
871         }
872
873         /* 
874            all databases support the "fetch" function. we need this
875            for efficient Samba3 ctdb fetch
876         */
877         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
878         if (ret != 0) {
879                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
880                 talloc_free(ctdb_db);
881                 return -1;
882         }
883
884         ret = ctdb_vacuum_init(ctdb_db);
885         if (ret != 0) {
886                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
887                                   "database '%s'\n", ctdb_db->db_name));
888                 talloc_free(ctdb_db);
889                 return -1;
890         }
891
892
893         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
894         
895         /* success */
896         return 0;
897 }
898
899
900 struct ctdb_deferred_attach_context {
901         struct ctdb_deferred_attach_context *next, *prev;
902         struct ctdb_context *ctdb;
903         struct ctdb_req_control *c;
904 };
905
906
907 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
908 {
909         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
910
911         return 0;
912 }
913
914 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
915 {
916         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
917         struct ctdb_context *ctdb = da_ctx->ctdb;
918
919         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
920         talloc_free(da_ctx);
921 }
922
923 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
924 {
925         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
926         struct ctdb_context *ctdb = da_ctx->ctdb;
927
928         /* This talloc-steals the packet ->c */
929         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
930         talloc_free(da_ctx);
931 }
932
933 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
934 {
935         struct ctdb_deferred_attach_context *da_ctx;
936
937         /* call it from the main event loop as soon as the current event 
938            finishes.
939          */
940         while ((da_ctx = ctdb->deferred_attach) != NULL) {
941                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
942                 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
943         }
944
945         return 0;
946 }
947
948 /*
949   a client has asked to attach a new database
950  */
951 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
952                                TDB_DATA *outdata, uint64_t tdb_flags, 
953                                bool persistent, uint32_t client_id,
954                                struct ctdb_req_control *c,
955                                bool *async_reply)
956 {
957         const char *db_name = (const char *)indata.dptr;
958         struct ctdb_db_context *db;
959         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
960         struct ctdb_client *client = NULL;
961
962         /* dont allow any local clients to attach while we are in recovery mode
963          * except for the recovery daemon.
964          * allow all attach from the network since these are always from remote
965          * recovery daemons.
966          */
967         if (client_id != 0) {
968                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
969         }
970         if (client != NULL) {
971                 /* If the node is inactive it is not part of the cluster
972                    and we should not allow clients to attach to any
973                    databases
974                 */
975                 if (node->flags & NODE_FLAGS_INACTIVE) {
976                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
977                         return -1;
978                 }
979
980                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
981                  && client->pid != ctdb->recoverd_pid) {
982                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
983
984                         if (da_ctx == NULL) {
985                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
986                                 return -1;
987                         }
988
989                         da_ctx->ctdb = ctdb;
990                         da_ctx->c = talloc_steal(da_ctx, c);
991                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
992                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
993
994                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
995
996                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
997                         *async_reply = true;
998                         return 0;
999                 }
1000         }
1001
1002         /* the client can optionally pass additional tdb flags, but we
1003            only allow a subset of those on the database in ctdb. Note
1004            that tdb_flags is passed in via the (otherwise unused)
1005            srvid to the attach control */
1006         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1007
1008         /* see if we already have this name */
1009         db = ctdb_db_handle(ctdb, db_name);
1010         if (db) {
1011                 outdata->dptr  = (uint8_t *)&db->db_id;
1012                 outdata->dsize = sizeof(db->db_id);
1013                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1014                 return 0;
1015         }
1016
1017         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1018                 return -1;
1019         }
1020
1021         db = ctdb_db_handle(ctdb, db_name);
1022         if (!db) {
1023                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1024                 return -1;
1025         }
1026
1027         /* remember the flags the client has specified */
1028         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1029
1030         outdata->dptr  = (uint8_t *)&db->db_id;
1031         outdata->dsize = sizeof(db->db_id);
1032
1033         /* Try to ensure it's locked in mem */
1034         ctdb_lockdown_memory(ctdb);
1035
1036         /* tell all the other nodes about this database */
1037         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1038                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1039                                                 CTDB_CONTROL_DB_ATTACH,
1040                                  0, CTDB_CTRL_FLAG_NOREPLY,
1041                                  indata, NULL, NULL);
1042
1043         /* success */
1044         return 0;
1045 }
1046
1047
1048 /*
1049   attach to all existing persistent databases
1050  */
1051 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1052                                   const char *unhealthy_reason)
1053 {
1054         DIR *d;
1055         struct dirent *de;
1056
1057         /* open the persistent db directory and scan it for files */
1058         d = opendir(ctdb->db_directory_persistent);
1059         if (d == NULL) {
1060                 return 0;
1061         }
1062
1063         while ((de=readdir(d))) {
1064                 char *p, *s, *q;
1065                 size_t len = strlen(de->d_name);
1066                 uint32_t node;
1067                 int invalid_name = 0;
1068                 
1069                 s = talloc_strdup(ctdb, de->d_name);
1070                 CTDB_NO_MEMORY(ctdb, s);
1071
1072                 /* only accept names ending in .tdb */
1073                 p = strstr(s, ".tdb.");
1074                 if (len < 7 || p == NULL) {
1075                         talloc_free(s);
1076                         continue;
1077                 }
1078
1079                 /* only accept names ending with .tdb. and any number of digits */
1080                 q = p+5;
1081                 while (*q != 0 && invalid_name == 0) {
1082                         if (!isdigit(*q++)) {
1083                                 invalid_name = 1;
1084                         }
1085                 }
1086                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1087                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1088                         talloc_free(s);
1089                         continue;
1090                 }
1091                 p[4] = 0;
1092
1093                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1094                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1095                         closedir(d);
1096                         talloc_free(s);
1097                         return -1;
1098                 }
1099
1100                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1101
1102                 talloc_free(s);
1103         }
1104         closedir(d);
1105         return 0;
1106 }
1107
1108 int ctdb_attach_databases(struct ctdb_context *ctdb)
1109 {
1110         int ret;
1111         char *persistent_health_path = NULL;
1112         char *unhealthy_reason = NULL;
1113         bool first_try = true;
1114
1115         if (ctdb->db_directory == NULL) {
1116                 ctdb->db_directory = VARDIR "/ctdb";
1117         }
1118         if (ctdb->db_directory_persistent == NULL) {
1119                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1120         }
1121         if (ctdb->db_directory_state == NULL) {
1122                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1123         }
1124
1125         /* make sure the db directory exists */
1126         ret = mkdir(ctdb->db_directory, 0700);
1127         if (ret == -1 && errno != EEXIST) {
1128                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1129                          ctdb->db_directory));
1130                 return -1;
1131         }
1132
1133         /* make sure the persistent db directory exists */
1134         ret = mkdir(ctdb->db_directory_persistent, 0700);
1135         if (ret == -1 && errno != EEXIST) {
1136                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1137                          ctdb->db_directory_persistent));
1138                 return -1;
1139         }
1140
1141         /* make sure the internal state db directory exists */
1142         ret = mkdir(ctdb->db_directory_state, 0700);
1143         if (ret == -1 && errno != EEXIST) {
1144                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1145                          ctdb->db_directory_state));
1146                 return -1;
1147         }
1148
1149         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1150                                                  ctdb->db_directory_state,
1151                                                  PERSISTENT_HEALTH_TDB,
1152                                                  ctdb->pnn);
1153         if (persistent_health_path == NULL) {
1154                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1155                 return -1;
1156         }
1157
1158 again:
1159
1160         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1161                                                    0, TDB_DISALLOW_NESTING,
1162                                                    O_CREAT | O_RDWR, 0600);
1163         if (ctdb->db_persistent_health == NULL) {
1164                 struct tdb_wrap *tdb;
1165
1166                 if (!first_try) {
1167                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1168                                           persistent_health_path,
1169                                           errno,
1170                                           strerror(errno)));
1171                         talloc_free(persistent_health_path);
1172                         talloc_free(unhealthy_reason);
1173                         return -1;
1174                 }
1175                 first_try = false;
1176
1177                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1178                                                    persistent_health_path,
1179                                                    "was cleared after a failure",
1180                                                    "manual verification needed");
1181                 if (unhealthy_reason == NULL) {
1182                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1183                         talloc_free(persistent_health_path);
1184                         return -1;
1185                 }
1186
1187                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1188                                   persistent_health_path));
1189                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1190                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1191                                     O_CREAT | O_RDWR, 0600);
1192                 if (tdb) {
1193                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1194                                           persistent_health_path,
1195                                           errno,
1196                                           strerror(errno)));
1197                         talloc_free(persistent_health_path);
1198                         talloc_free(unhealthy_reason);
1199                         return -1;
1200                 }
1201
1202                 talloc_free(tdb);
1203                 goto again;
1204         }
1205         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1206         if (ret != 0) {
1207                 struct tdb_wrap *tdb;
1208
1209                 talloc_free(ctdb->db_persistent_health);
1210                 ctdb->db_persistent_health = NULL;
1211
1212                 if (!first_try) {
1213                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1214                                           persistent_health_path));
1215                         talloc_free(persistent_health_path);
1216                         talloc_free(unhealthy_reason);
1217                         return -1;
1218                 }
1219                 first_try = false;
1220
1221                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1222                                                    persistent_health_path,
1223                                                    "was cleared after a failure",
1224                                                    "manual verification needed");
1225                 if (unhealthy_reason == NULL) {
1226                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1227                         talloc_free(persistent_health_path);
1228                         return -1;
1229                 }
1230
1231                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1232                                   persistent_health_path));
1233                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1234                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1235                                     O_CREAT | O_RDWR, 0600);
1236                 if (tdb) {
1237                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1238                                           persistent_health_path,
1239                                           errno,
1240                                           strerror(errno)));
1241                         talloc_free(persistent_health_path);
1242                         talloc_free(unhealthy_reason);
1243                         return -1;
1244                 }
1245
1246                 talloc_free(tdb);
1247                 goto again;
1248         }
1249         talloc_free(persistent_health_path);
1250
1251         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1252         talloc_free(unhealthy_reason);
1253         if (ret != 0) {
1254                 return ret;
1255         }
1256
1257         return 0;
1258 }
1259
1260 /*
1261   called when a broadcast seqnum update comes in
1262  */
1263 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1264 {
1265         struct ctdb_db_context *ctdb_db;
1266         if (srcnode == ctdb->pnn) {
1267                 /* don't update ourselves! */
1268                 return 0;
1269         }
1270
1271         ctdb_db = find_ctdb_db(ctdb, db_id);
1272         if (!ctdb_db) {
1273                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1274                 return -1;
1275         }
1276
1277         if (ctdb_db->unhealthy_reason) {
1278                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1279                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1280                 return -1;
1281         }
1282
1283         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1284         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1285         return 0;
1286 }
1287
1288 /*
1289   timer to check for seqnum changes in a ltdb and propogate them
1290  */
1291 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1292                                    struct timeval t, void *p)
1293 {
1294         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1295         struct ctdb_context *ctdb = ctdb_db->ctdb;
1296         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1297         if (new_seqnum != ctdb_db->seqnum) {
1298                 /* something has changed - propogate it */
1299                 TDB_DATA data;
1300                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1301                 data.dsize = sizeof(uint32_t);
1302                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1303                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1304                                          data, NULL, NULL);             
1305         }
1306         ctdb_db->seqnum = new_seqnum;
1307
1308         /* setup a new timer */
1309         ctdb_db->seqnum_update =
1310                 event_add_timed(ctdb->ev, ctdb_db, 
1311                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1312                                 ctdb_ltdb_seqnum_check, ctdb_db);
1313 }
1314
1315 /*
1316   enable seqnum handling on this db
1317  */
1318 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1319 {
1320         struct ctdb_db_context *ctdb_db;
1321         ctdb_db = find_ctdb_db(ctdb, db_id);
1322         if (!ctdb_db) {
1323                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1324                 return -1;
1325         }
1326
1327         if (ctdb_db->seqnum_update == NULL) {
1328                 ctdb_db->seqnum_update =
1329                         event_add_timed(ctdb->ev, ctdb_db, 
1330                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1331                                         ctdb_ltdb_seqnum_check, ctdb_db);
1332         }
1333
1334         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1335         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1336         return 0;
1337 }
1338
1339 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1340 {
1341         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1342         struct ctdb_db_context *ctdb_db;
1343
1344         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1345         if (!ctdb_db) {
1346                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1347                 return 0;
1348         }
1349
1350         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1351                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1352                 return 0;
1353         }
1354
1355         ctdb_db->priority = db_prio->priority;
1356         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1357
1358         return 0;
1359 }
1360