5e4128bd3266c233fd3bab8e249b9fea246e0a42
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31 #include "common/reqid.h"
32 #include "common/system.h"
33
34 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35
36 /**
37  * write a record to a normal database
38  *
39  * This is the server-variant of the ctdb_ltdb_store function.
40  * It contains logic to determine whether a record should be
41  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
42  * controls to the local ctdb daemon if apporpriate.
43  */
44 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
45                                   TDB_DATA key,
46                                   struct ctdb_ltdb_header *header,
47                                   TDB_DATA data)
48 {
49         struct ctdb_context *ctdb = ctdb_db->ctdb;
50         TDB_DATA rec;
51         int ret;
52         bool seqnum_suppressed = false;
53         bool keep = false;
54         bool schedule_for_deletion = false;
55         bool remove_from_delete_queue = false;
56         uint32_t lmaster;
57
58         if (ctdb->flags & CTDB_FLAG_TORTURE) {
59                 struct ctdb_ltdb_header *h2;
60                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
61                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
62                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
63                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
64                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
65                 }
66                 if (rec.dptr) free(rec.dptr);
67         }
68
69         if (ctdb->vnn_map == NULL) {
70                 /*
71                  * Called from a client: always store the record
72                  * Also don't call ctdb_lmaster since it uses the vnn_map!
73                  */
74                 keep = true;
75                 goto store;
76         }
77
78         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
79
80         /*
81          * If we migrate an empty record off to another node
82          * and the record has not been migrated with data,
83          * delete the record instead of storing the empty record.
84          */
85         if (data.dsize != 0) {
86                 keep = true;
87         } else if (header->flags & CTDB_REC_RO_FLAGS) {
88                 keep = true;
89         } else if (ctdb_db->persistent) {
90                 keep = true;
91         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
92                 /*
93                  * The record is not created by the client but
94                  * automatically by the ctdb_ltdb_fetch logic that
95                  * creates a record with an initial header in the
96                  * ltdb before trying to migrate the record from
97                  * the current lmaster. Keep it instead of trying
98                  * to delete the non-existing record...
99                  */
100                 keep = true;
101                 schedule_for_deletion = true;
102         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
103                 keep = true;
104         } else if (ctdb_db->ctdb->pnn == lmaster) {
105                 /*
106                  * If we are lmaster, then we usually keep the record.
107                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
108                  * and the record is empty and has never been migrated
109                  * with data, then we should delete it instead of storing it.
110                  * This is part of the vacuuming process.
111                  *
112                  * The reason that we usually need to store even empty records
113                  * on the lmaster is that a client operating directly on the
114                  * lmaster (== dmaster) expects the local copy of the record to
115                  * exist after successful ctdb migrate call. If the record does
116                  * not exist, the client goes into a migrate loop and eventually
117                  * fails. So storing the empty record makes sure that we do not
118                  * need to change the client code.
119                  */
120                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
121                         keep = true;
122                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
123                         keep = true;
124                 }
125         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
126                 keep = true;
127         }
128
129         if (keep) {
130                 if (!ctdb_db->persistent &&
131                     (ctdb_db->ctdb->pnn == header->dmaster) &&
132                     !(header->flags & CTDB_REC_RO_FLAGS))
133                 {
134                         header->rsn++;
135
136                         if (data.dsize == 0) {
137                                 schedule_for_deletion = true;
138                         }
139                 }
140                 remove_from_delete_queue = !schedule_for_deletion;
141         }
142
143 store:
144         /*
145          * The VACUUM_MIGRATED flag is only set temporarily for
146          * the above logic when the record was retrieved by a
147          * VACUUM_MIGRATE call and should not be stored in the
148          * database.
149          *
150          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
151          * and there are two cases in which the corresponding record
152          * is stored in the local database:
153          * 1. The record has been migrated with data in the past
154          *    (the MIGRATED_WITH_DATA record flag is set).
155          * 2. The record has been filled with data again since it
156          *    had been submitted in the VACUUM_FETCH message to the
157          *    lmaster.
158          * For such records it is important to not store the
159          * VACUUM_MIGRATED flag in the database.
160          */
161         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
162
163         /*
164          * Similarly, clear the AUTOMATIC flag which should not enter
165          * the local database copy since this would require client
166          * modifications to clear the flag when the client stores
167          * the record.
168          */
169         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
170
171         rec.dsize = sizeof(*header) + data.dsize;
172         rec.dptr = talloc_size(ctdb, rec.dsize);
173         CTDB_NO_MEMORY(ctdb, rec.dptr);
174
175         memcpy(rec.dptr, header, sizeof(*header));
176         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
177
178         /* Databases with seqnum updates enabled only get their seqnum
179            changes when/if we modify the data */
180         if (ctdb_db->seqnum_update != NULL) {
181                 TDB_DATA old;
182                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
183
184                 if ( (old.dsize == rec.dsize)
185                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
186                           rec.dptr+sizeof(struct ctdb_ltdb_header),
187                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
188                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
189                         seqnum_suppressed = true;
190                 }
191                 if (old.dptr) free(old.dptr);
192         }
193
194         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
195                             ctdb_db->db_name,
196                             keep?"storing":"deleting",
197                             ctdb_hash(&key)));
198
199         if (keep) {
200                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
201         } else {
202                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
203         }
204
205         if (ret != 0) {
206                 int lvl = DEBUG_ERR;
207
208                 if (keep == false &&
209                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
210                 {
211                         lvl = DEBUG_DEBUG;
212                 }
213
214                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
215                             "%d - %s\n",
216                             ctdb_db->db_name,
217                             keep?"store":"delete", ret,
218                             tdb_errorstr(ctdb_db->ltdb->tdb)));
219
220                 schedule_for_deletion = false;
221                 remove_from_delete_queue = false;
222         }
223         if (seqnum_suppressed) {
224                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
225         }
226
227         talloc_free(rec.dptr);
228
229         if (schedule_for_deletion) {
230                 int ret2;
231                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
232                 if (ret2 != 0) {
233                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
234                 }
235         }
236
237         if (remove_from_delete_queue) {
238                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
239         }
240
241         return ret;
242 }
243
244 struct lock_fetch_state {
245         struct ctdb_context *ctdb;
246         struct ctdb_db_context *ctdb_db;
247         void (*recv_pkt)(void *, struct ctdb_req_header *);
248         void *recv_context;
249         struct ctdb_req_header *hdr;
250         uint32_t generation;
251         bool ignore_generation;
252 };
253
254 /*
255   called when we should retry the operation
256  */
257 static void lock_fetch_callback(void *p, bool locked)
258 {
259         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
260         if (!state->ignore_generation &&
261             state->generation != state->ctdb_db->generation) {
262                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
263                 talloc_free(state->hdr);
264                 return;
265         }
266         state->recv_pkt(state->recv_context, state->hdr);
267         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
268 }
269
270
271 /*
272   do a non-blocking ltdb_lock, deferring this ctdb request until we
273   have the chainlock
274
275   It does the following:
276
277    1) tries to get the chainlock. If it succeeds, then it returns 0
278
279    2) if it fails to get a chainlock immediately then it sets up a
280    non-blocking chainlock via ctdb_lock_record, and when it gets the
281    chainlock it re-submits this ctdb request to the main packet
282    receive function.
283
284    This effectively queues all ctdb requests that cannot be
285    immediately satisfied until it can get the lock. This means that
286    the main ctdb daemon will not block waiting for a chainlock held by
287    a client
288
289    There are 3 possible return values:
290
291        0:    means that it got the lock immediately.
292       -1:    means that it failed to get the lock, and won't retry
293       -2:    means that it failed to get the lock immediately, but will retry
294  */
295 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
296                            TDB_DATA key, struct ctdb_req_header *hdr,
297                            void (*recv_pkt)(void *, struct ctdb_req_header *),
298                            void *recv_context, bool ignore_generation)
299 {
300         int ret;
301         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
302         struct lock_request *lreq;
303         struct lock_fetch_state *state;
304         
305         ret = tdb_chainlock_nonblock(tdb, key);
306
307         if (ret != 0 &&
308             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
309                 /* a hard failure - don't try again */
310                 return -1;
311         }
312
313         /* when torturing, ensure we test the contended path */
314         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
315             random() % 5 == 0) {
316                 ret = -1;
317                 tdb_chainunlock(tdb, key);
318         }
319
320         /* first the non-contended path */
321         if (ret == 0) {
322                 return 0;
323         }
324
325         state = talloc(hdr, struct lock_fetch_state);
326         state->ctdb = ctdb_db->ctdb;
327         state->ctdb_db = ctdb_db;
328         state->hdr = hdr;
329         state->recv_pkt = recv_pkt;
330         state->recv_context = recv_context;
331         state->generation = ctdb_db->generation;
332         state->ignore_generation = ignore_generation;
333
334         /* now the contended path */
335         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
336         if (lreq == NULL) {
337                 return -1;
338         }
339
340         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
341            so it won't be freed yet */
342         talloc_steal(state, hdr);
343
344         /* now tell the caller than we will retry asynchronously */
345         return -2;
346 }
347
348 /*
349   a varient of ctdb_ltdb_lock_requeue that also fetches the record
350  */
351 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
352                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
353                                  struct ctdb_req_header *hdr, TDB_DATA *data,
354                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
355                                  void *recv_context, bool ignore_generation)
356 {
357         int ret;
358
359         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
360                                      recv_context, ignore_generation);
361         if (ret == 0) {
362                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
363                 if (ret != 0) {
364                         int uret;
365                         uret = ctdb_ltdb_unlock(ctdb_db, key);
366                         if (uret != 0) {
367                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
368                         }
369                 }
370         }
371         return ret;
372 }
373
374
375 /*
376   paraoid check to see if the db is empty
377  */
378 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
379 {
380         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
381         int count = tdb_traverse_read(tdb, NULL, NULL);
382         if (count != 0) {
383                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
384                          ctdb_db->db_path));
385                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
386         }
387 }
388
389 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
390                                 struct ctdb_db_context *ctdb_db)
391 {
392         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
393         char *old;
394         char *reason = NULL;
395         TDB_DATA key;
396         TDB_DATA val;
397
398         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
399         key.dsize = strlen(ctdb_db->db_name);
400
401         old = ctdb_db->unhealthy_reason;
402         ctdb_db->unhealthy_reason = NULL;
403
404         val = tdb_fetch(tdb, key);
405         if (val.dsize > 0) {
406                 reason = talloc_strndup(ctdb_db,
407                                         (const char *)val.dptr,
408                                         val.dsize);
409                 if (reason == NULL) {
410                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
411                                            (int)val.dsize));
412                         ctdb_db->unhealthy_reason = old;
413                         free(val.dptr);
414                         return -1;
415                 }
416         }
417
418         if (val.dptr) {
419                 free(val.dptr);
420         }
421
422         talloc_free(old);
423         ctdb_db->unhealthy_reason = reason;
424         return 0;
425 }
426
427 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
428                                   struct ctdb_db_context *ctdb_db,
429                                   const char *given_reason,/* NULL means healthy */
430                                   int num_healthy_nodes)
431 {
432         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
433         int ret;
434         TDB_DATA key;
435         TDB_DATA val;
436         char *new_reason = NULL;
437         char *old_reason = NULL;
438
439         ret = tdb_transaction_start(tdb);
440         if (ret != 0) {
441                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
442                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
443                 return -1;
444         }
445
446         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
447         if (ret != 0) {
448                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
449                                    ctdb_db->db_name, ret));
450                 return -1;
451         }
452         old_reason = ctdb_db->unhealthy_reason;
453
454         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
455         key.dsize = strlen(ctdb_db->db_name);
456
457         if (given_reason) {
458                 new_reason = talloc_strdup(ctdb_db, given_reason);
459                 if (new_reason == NULL) {
460                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
461                                           given_reason));
462                         return -1;
463                 }
464         } else if (old_reason && num_healthy_nodes == 0) {
465                 /*
466                  * If the reason indicates ok, but there where no healthy nodes
467                  * available, that it means, we have not recovered valid content
468                  * of the db. So if there's an old reason, prefix it with
469                  * "NO-HEALTHY-NODES - "
470                  */
471                 const char *prefix;
472
473 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
474                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
475                 if (ret != 0) {
476                         prefix = _TMP_PREFIX;
477                 } else {
478                         prefix = "";
479                 }
480                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
481                                          prefix, old_reason);
482                 if (new_reason == NULL) {
483                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
484                                           prefix, old_reason));
485                         return -1;
486                 }
487 #undef _TMP_PREFIX
488         }
489
490         if (new_reason) {
491                 val.dptr = discard_const_p(uint8_t, new_reason);
492                 val.dsize = strlen(new_reason);
493
494                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
495                 if (ret != 0) {
496                         tdb_transaction_cancel(tdb);
497                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
498                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
499                                            ret, tdb_errorstr(tdb)));
500                         talloc_free(new_reason);
501                         return -1;
502                 }
503                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
504                                    ctdb_db->db_name, new_reason));
505         } else if (old_reason) {
506                 ret = tdb_delete(tdb, key);
507                 if (ret != 0) {
508                         tdb_transaction_cancel(tdb);
509                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
510                                            tdb_name(tdb), ctdb_db->db_name,
511                                            ret, tdb_errorstr(tdb)));
512                         talloc_free(new_reason);
513                         return -1;
514                 }
515                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
516                                    ctdb_db->db_name));
517         }
518
519         ret = tdb_transaction_commit(tdb);
520         if (ret != TDB_SUCCESS) {
521                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
522                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
523                 talloc_free(new_reason);
524                 return -1;
525         }
526
527         talloc_free(old_reason);
528         ctdb_db->unhealthy_reason = new_reason;
529
530         return 0;
531 }
532
533 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
534                                      struct ctdb_db_context *ctdb_db)
535 {
536         time_t now = time(NULL);
537         char *new_path;
538         char *new_reason;
539         int ret;
540         struct tm *tm;
541
542         tm = gmtime(&now);
543
544         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
545         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
546                                    "%04u%02u%02u%02u%02u%02u.0Z",
547                                    ctdb_db->db_path,
548                                    tm->tm_year+1900, tm->tm_mon+1,
549                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
550                                    tm->tm_sec);
551         if (new_path == NULL) {
552                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
553                 return -1;
554         }
555
556         new_reason = talloc_asprintf(ctdb_db,
557                                      "ERROR - Backup of corrupted TDB in '%s'",
558                                      new_path);
559         if (new_reason == NULL) {
560                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
561                 return -1;
562         }
563         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
564         talloc_free(new_reason);
565         if (ret != 0) {
566                 DEBUG(DEBUG_CRIT,(__location__
567                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
568                                  ctdb_db->db_path));
569                 return -1;
570         }
571
572         ret = rename(ctdb_db->db_path, new_path);
573         if (ret != 0) {
574                 DEBUG(DEBUG_CRIT,(__location__
575                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
576                                   ctdb_db->db_path, new_path,
577                                   errno, strerror(errno)));
578                 talloc_free(new_path);
579                 return -1;
580         }
581
582         DEBUG(DEBUG_CRIT,(__location__
583                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
584                          ctdb_db->db_path, new_path));
585         talloc_free(new_path);
586         return 0;
587 }
588
589 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
590 {
591         struct ctdb_db_context *ctdb_db;
592         int ret;
593         int ok = 0;
594         int fail = 0;
595
596         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
597                 if (!ctdb_db->persistent) {
598                         continue;
599                 }
600
601                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
602                 if (ret != 0) {
603                         DEBUG(DEBUG_ALERT,(__location__
604                                            " load persistent health for '%s' failed\n",
605                                            ctdb_db->db_path));
606                         return -1;
607                 }
608
609                 if (ctdb_db->unhealthy_reason == NULL) {
610                         ok++;
611                         DEBUG(DEBUG_INFO,(__location__
612                                    " persistent db '%s' healthy\n",
613                                    ctdb_db->db_path));
614                         continue;
615                 }
616
617                 fail++;
618                 DEBUG(DEBUG_ALERT,(__location__
619                                    " persistent db '%s' unhealthy: %s\n",
620                                    ctdb_db->db_path,
621                                    ctdb_db->unhealthy_reason));
622         }
623         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
624               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
625                ok, fail));
626
627         if (fail != 0) {
628                 return -1;
629         }
630
631         return 0;
632 }
633
634
635 /*
636   mark a database - as healthy
637  */
638 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
639 {
640         uint32_t db_id = *(uint32_t *)indata.dptr;
641         struct ctdb_db_context *ctdb_db;
642         int ret;
643         bool may_recover = false;
644
645         ctdb_db = find_ctdb_db(ctdb, db_id);
646         if (!ctdb_db) {
647                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
648                 return -1;
649         }
650
651         if (ctdb_db->unhealthy_reason) {
652                 may_recover = true;
653         }
654
655         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
656         if (ret != 0) {
657                 DEBUG(DEBUG_ERR,(__location__
658                                  " ctdb_update_persistent_health(%s) failed\n",
659                                  ctdb_db->db_name));
660                 return -1;
661         }
662
663         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
664                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
665                                   ctdb_db->db_name));
666                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
667         }
668
669         return 0;
670 }
671
672 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
673                                    TDB_DATA indata,
674                                    TDB_DATA *outdata)
675 {
676         uint32_t db_id = *(uint32_t *)indata.dptr;
677         struct ctdb_db_context *ctdb_db;
678         int ret;
679
680         ctdb_db = find_ctdb_db(ctdb, db_id);
681         if (!ctdb_db) {
682                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
683                 return -1;
684         }
685
686         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
687         if (ret != 0) {
688                 DEBUG(DEBUG_ERR,(__location__
689                                  " ctdb_load_persistent_health(%s) failed\n",
690                                  ctdb_db->db_name));
691                 return -1;
692         }
693
694         *outdata = tdb_null;
695         if (ctdb_db->unhealthy_reason) {
696                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
697                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
698         }
699
700         return 0;
701 }
702
703
704 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
705 {
706         char *ropath;
707
708         if (ctdb_db->readonly) {
709                 return 0;
710         }
711
712         if (ctdb_db->persistent) {
713                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
714                 return -1;
715         }
716
717         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
718         if (ropath == NULL) {
719                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
720                 return -1;
721         }
722         ctdb_db->rottdb = tdb_open(ropath, 
723                               ctdb->tunable.database_hash_size, 
724                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
725                               O_CREAT|O_RDWR, 0600);
726         if (ctdb_db->rottdb == NULL) {
727                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
728                 talloc_free(ropath);
729                 return -1;
730         }
731
732         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
733
734         ctdb_db->readonly = true;
735
736         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
737
738         talloc_free(ropath);
739         return 0;
740 }
741
742 /*
743   attach to a database, handling both persistent and non-persistent databases
744   return 0 on success, -1 on failure
745  */
746 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
747                              bool persistent, const char *unhealthy_reason,
748                              bool jenkinshash, bool mutexes)
749 {
750         struct ctdb_db_context *ctdb_db, *tmp_db;
751         int ret;
752         struct TDB_DATA key;
753         unsigned tdb_flags;
754         int mode = 0600;
755         int remaining_tries = 0;
756
757         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
758         CTDB_NO_MEMORY(ctdb, ctdb_db);
759
760         ctdb_db->priority = 1;
761         ctdb_db->ctdb = ctdb;
762         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
763         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
764
765         key.dsize = strlen(db_name)+1;
766         key.dptr  = discard_const(db_name);
767         ctdb_db->db_id = ctdb_hash(&key);
768         ctdb_db->persistent = persistent;
769
770         if (!ctdb_db->persistent) {
771                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
772                 if (ctdb_db->delete_queue == NULL) {
773                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
774                 }
775
776                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
777         }
778
779         /* check for hash collisions */
780         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
781                 if (tmp_db->db_id == ctdb_db->db_id) {
782                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
783                                  tmp_db->db_id, db_name, tmp_db->db_name));
784                         talloc_free(ctdb_db);
785                         return -1;
786                 }
787         }
788
789         if (persistent) {
790                 if (unhealthy_reason) {
791                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
792                                                             unhealthy_reason, 0);
793                         if (ret != 0) {
794                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
795                                                    ctdb_db->db_name, unhealthy_reason, ret));
796                                 talloc_free(ctdb_db);
797                                 return -1;
798                         }
799                 }
800
801                 if (ctdb->max_persistent_check_errors > 0) {
802                         remaining_tries = 1;
803                 }
804                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
805                         remaining_tries = 0;
806                 }
807
808                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
809                 if (ret != 0) {
810                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
811                                    ctdb_db->db_name, ret));
812                         talloc_free(ctdb_db);
813                         return -1;
814                 }
815         }
816
817         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
818                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
819                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
820                 talloc_free(ctdb_db);
821                 return -1;
822         }
823
824         if (ctdb_db->unhealthy_reason) {
825                 /* this is just a warning, but we want that in the log file! */
826                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
827                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
828         }
829
830         /* open the database */
831         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
832                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
833                                            db_name, ctdb->pnn);
834
835         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
836         if (ctdb->valgrinding) {
837                 tdb_flags |= TDB_NOMMAP;
838         }
839         tdb_flags |= TDB_DISALLOW_NESTING;
840         if (jenkinshash) {
841                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
842         }
843 #ifdef TDB_MUTEX_LOCKING
844         if (ctdb->tunable.mutex_enabled && mutexes &&
845             tdb_runtime_check_for_robust_mutexes()) {
846                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
847         }
848 #endif
849
850 again:
851         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
852                                       ctdb->tunable.database_hash_size, 
853                                       tdb_flags, 
854                                       O_CREAT|O_RDWR, mode);
855         if (ctdb_db->ltdb == NULL) {
856                 struct stat st;
857                 int saved_errno = errno;
858
859                 if (!persistent) {
860                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
861                                           ctdb_db->db_path,
862                                           saved_errno,
863                                           strerror(saved_errno)));
864                         talloc_free(ctdb_db);
865                         return -1;
866                 }
867
868                 if (remaining_tries == 0) {
869                         DEBUG(DEBUG_CRIT,(__location__
870                                           "Failed to open persistent tdb '%s': %d - %s\n",
871                                           ctdb_db->db_path,
872                                           saved_errno,
873                                           strerror(saved_errno)));
874                         talloc_free(ctdb_db);
875                         return -1;
876                 }
877
878                 ret = stat(ctdb_db->db_path, &st);
879                 if (ret != 0) {
880                         DEBUG(DEBUG_CRIT,(__location__
881                                           "Failed to open persistent tdb '%s': %d - %s\n",
882                                           ctdb_db->db_path,
883                                           saved_errno,
884                                           strerror(saved_errno)));
885                         talloc_free(ctdb_db);
886                         return -1;
887                 }
888
889                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
890                 if (ret != 0) {
891                         DEBUG(DEBUG_CRIT,(__location__
892                                           "Failed to open persistent tdb '%s': %d - %s\n",
893                                           ctdb_db->db_path,
894                                           saved_errno,
895                                           strerror(saved_errno)));
896                         talloc_free(ctdb_db);
897                         return -1;
898                 }
899
900                 remaining_tries--;
901                 mode = st.st_mode;
902                 goto again;
903         }
904
905         if (!persistent) {
906                 ctdb_check_db_empty(ctdb_db);
907         } else {
908                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
909                 if (ret != 0) {
910                         int fd;
911                         struct stat st;
912
913                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
914                                           ctdb_db->db_path, ret,
915                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
916                         if (remaining_tries == 0) {
917                                 talloc_free(ctdb_db);
918                                 return -1;
919                         }
920
921                         fd = tdb_fd(ctdb_db->ltdb->tdb);
922                         ret = fstat(fd, &st);
923                         if (ret != 0) {
924                                 DEBUG(DEBUG_CRIT,(__location__
925                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
926                                                   ctdb_db->db_path,
927                                                   errno,
928                                                   strerror(errno)));
929                                 talloc_free(ctdb_db);
930                                 return -1;
931                         }
932
933                         /* close the TDB */
934                         talloc_free(ctdb_db->ltdb);
935                         ctdb_db->ltdb = NULL;
936
937                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
938                         if (ret != 0) {
939                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
940                                                   ctdb_db->db_path));
941                                 talloc_free(ctdb_db);
942                                 return -1;
943                         }
944
945                         remaining_tries--;
946                         mode = st.st_mode;
947                         goto again;
948                 }
949         }
950
951         /* set up a rb tree we can use to track which records we have a 
952            fetch-lock in-flight for so we can defer any additional calls
953            for the same record.
954          */
955         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
956         if (ctdb_db->deferred_fetch == NULL) {
957                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
958                 talloc_free(ctdb_db);
959                 return -1;
960         }
961
962         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
963         if (ctdb_db->defer_dmaster == NULL) {
964                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
965                                   ctdb_db->db_name));
966                 talloc_free(ctdb_db);
967                 return -1;
968         }
969
970         DLIST_ADD(ctdb->db_list, ctdb_db);
971
972         /* setting this can help some high churn databases */
973         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
974
975         /* 
976            all databases support the "null" function. we need this in
977            order to do forced migration of records
978         */
979         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
980         if (ret != 0) {
981                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
982                 talloc_free(ctdb_db);
983                 return -1;
984         }
985
986         /* 
987            all databases support the "fetch" function. we need this
988            for efficient Samba3 ctdb fetch
989         */
990         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
991         if (ret != 0) {
992                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
993                 talloc_free(ctdb_db);
994                 return -1;
995         }
996
997         /* 
998            all databases support the "fetch_with_header" function. we need this
999            for efficient readonly record fetches
1000         */
1001         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1002         if (ret != 0) {
1003                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1004                 talloc_free(ctdb_db);
1005                 return -1;
1006         }
1007
1008         ret = ctdb_vacuum_init(ctdb_db);
1009         if (ret != 0) {
1010                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1011                                   "database '%s'\n", ctdb_db->db_name));
1012                 talloc_free(ctdb_db);
1013                 return -1;
1014         }
1015
1016         ctdb_db->generation = ctdb->vnn_map->generation;
1017
1018         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1019                             ctdb_db->db_path, tdb_flags));
1020
1021         /* success */
1022         return 0;
1023 }
1024
1025
1026 struct ctdb_deferred_attach_context {
1027         struct ctdb_deferred_attach_context *next, *prev;
1028         struct ctdb_context *ctdb;
1029         struct ctdb_req_control *c;
1030 };
1031
1032
1033 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1034 {
1035         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1036
1037         return 0;
1038 }
1039
1040 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1041 {
1042         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1043         struct ctdb_context *ctdb = da_ctx->ctdb;
1044
1045         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1046         talloc_free(da_ctx);
1047 }
1048
1049 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1050 {
1051         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1052         struct ctdb_context *ctdb = da_ctx->ctdb;
1053
1054         /* This talloc-steals the packet ->c */
1055         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1056         talloc_free(da_ctx);
1057 }
1058
1059 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1060 {
1061         struct ctdb_deferred_attach_context *da_ctx;
1062
1063         /* call it from the main event loop as soon as the current event 
1064            finishes.
1065          */
1066         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1067                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1068                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1069         }
1070
1071         return 0;
1072 }
1073
1074 /*
1075   a client has asked to attach a new database
1076  */
1077 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1078                                TDB_DATA *outdata, uint64_t tdb_flags, 
1079                                bool persistent, uint32_t client_id,
1080                                struct ctdb_req_control *c,
1081                                bool *async_reply)
1082 {
1083         const char *db_name = (const char *)indata.dptr;
1084         struct ctdb_db_context *db;
1085         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1086         struct ctdb_client *client = NULL;
1087         bool with_jenkinshash, with_mutexes;
1088
1089         if (ctdb->tunable.allow_client_db_attach == 0) {
1090                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1091                                   "AllowClientDBAccess == 0\n", db_name));
1092                 return -1;
1093         }
1094
1095         /* dont allow any local clients to attach while we are in recovery mode
1096          * except for the recovery daemon.
1097          * allow all attach from the network since these are always from remote
1098          * recovery daemons.
1099          */
1100         if (client_id != 0) {
1101                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1102         }
1103         if (client != NULL) {
1104                 /* If the node is inactive it is not part of the cluster
1105                    and we should not allow clients to attach to any
1106                    databases
1107                 */
1108                 if (node->flags & NODE_FLAGS_INACTIVE) {
1109                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1110                         return -1;
1111                 }
1112
1113                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1114                     client->pid != ctdb->recoverd_pid &&
1115                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1116                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1117
1118                         if (da_ctx == NULL) {
1119                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1120                                 return -1;
1121                         }
1122
1123                         da_ctx->ctdb = ctdb;
1124                         da_ctx->c = talloc_steal(da_ctx, c);
1125                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1126                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1127
1128                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1129
1130                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1131                         *async_reply = true;
1132                         return 0;
1133                 }
1134         }
1135
1136         /* the client can optionally pass additional tdb flags, but we
1137            only allow a subset of those on the database in ctdb. Note
1138            that tdb_flags is passed in via the (otherwise unused)
1139            srvid to the attach control */
1140 #ifdef TDB_MUTEX_LOCKING
1141         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1142 #else
1143         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1144 #endif
1145
1146         /* see if we already have this name */
1147         db = ctdb_db_handle(ctdb, db_name);
1148         if (db) {
1149                 if (db->persistent != persistent) {
1150                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1151                                           "database %s\n", persistent ? "" : "non-",
1152                                           db-> persistent ? "" : "non-", db_name));
1153                         return -1;
1154                 }
1155                 outdata->dptr  = (uint8_t *)&db->db_id;
1156                 outdata->dsize = sizeof(db->db_id);
1157                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1158                 return 0;
1159         }
1160
1161         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1162 #ifdef TDB_MUTEX_LOCKING
1163         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1164 #else
1165         with_mutexes = false;
1166 #endif
1167
1168         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1169                               with_jenkinshash, with_mutexes) != 0) {
1170                 return -1;
1171         }
1172
1173         db = ctdb_db_handle(ctdb, db_name);
1174         if (!db) {
1175                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1176                 return -1;
1177         }
1178
1179         /* remember the flags the client has specified */
1180         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1181
1182         outdata->dptr  = (uint8_t *)&db->db_id;
1183         outdata->dsize = sizeof(db->db_id);
1184
1185         /* Try to ensure it's locked in mem */
1186         lockdown_memory(ctdb->valgrinding);
1187
1188         /* tell all the other nodes about this database */
1189         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1190                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1191                                                 CTDB_CONTROL_DB_ATTACH,
1192                                  0, CTDB_CTRL_FLAG_NOREPLY,
1193                                  indata, NULL, NULL);
1194
1195         /* success */
1196         return 0;
1197 }
1198
1199 /*
1200  * a client has asked to detach from a database
1201  */
1202 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1203                                uint32_t client_id)
1204 {
1205         uint32_t db_id;
1206         struct ctdb_db_context *ctdb_db;
1207         struct ctdb_client *client = NULL;
1208
1209         db_id = *(uint32_t *)indata.dptr;
1210         ctdb_db = find_ctdb_db(ctdb, db_id);
1211         if (ctdb_db == NULL) {
1212                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1213                                   db_id));
1214                 return -1;
1215         }
1216
1217         if (ctdb->tunable.allow_client_db_attach == 1) {
1218                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1219                                   "Clients are allowed access to databases "
1220                                   "(AllowClientDBAccess == 1)\n",
1221                                   ctdb_db->db_name));
1222                 return -1;
1223         }
1224
1225         if (ctdb_db->persistent) {
1226                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1227                                   "denied\n", ctdb_db->db_name));
1228                 return -1;
1229         }
1230
1231         /* Cannot detach from database when in recovery */
1232         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1233                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1234                 return -1;
1235         }
1236
1237         /* If a control comes from a client, then broadcast it to all nodes.
1238          * Do the actual detach only if the control comes from other daemons.
1239          */
1240         if (client_id != 0) {
1241                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1242                 if (client != NULL) {
1243                         /* forward the control to all the nodes */
1244                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1245                                                  CTDB_CONTROL_DB_DETACH, 0,
1246                                                  CTDB_CTRL_FLAG_NOREPLY,
1247                                                  indata, NULL, NULL);
1248                         return 0;
1249                 }
1250                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1251                                   "for database '%s'\n", ctdb_db->db_name));
1252                 return -1;
1253         }
1254
1255         /* Detach database from recoverd */
1256         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1257                                      CTDB_SRVID_DETACH_DATABASE,
1258                                      indata) != 0) {
1259                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1260                 return -1;
1261         }
1262
1263         /* Disable vacuuming and drop all vacuuming data */
1264         talloc_free(ctdb_db->vacuum_handle);
1265         talloc_free(ctdb_db->delete_queue);
1266
1267         /* Terminate any deferred fetch */
1268         talloc_free(ctdb_db->deferred_fetch);
1269
1270         /* Terminate any traverses */
1271         while (ctdb_db->traverse) {
1272                 talloc_free(ctdb_db->traverse);
1273         }
1274
1275         /* Terminate any revokes */
1276         while (ctdb_db->revokechild_active) {
1277                 talloc_free(ctdb_db->revokechild_active);
1278         }
1279
1280         /* Free readonly tracking database */
1281         if (ctdb_db->readonly) {
1282                 talloc_free(ctdb_db->rottdb);
1283         }
1284
1285         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1286
1287         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1288                              ctdb_db->db_name));
1289         talloc_free(ctdb_db);
1290
1291         return 0;
1292 }
1293
1294 /*
1295   attach to all existing persistent databases
1296  */
1297 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1298                                   const char *unhealthy_reason)
1299 {
1300         DIR *d;
1301         struct dirent *de;
1302
1303         /* open the persistent db directory and scan it for files */
1304         d = opendir(ctdb->db_directory_persistent);
1305         if (d == NULL) {
1306                 return 0;
1307         }
1308
1309         while ((de=readdir(d))) {
1310                 char *p, *s, *q;
1311                 size_t len = strlen(de->d_name);
1312                 uint32_t node;
1313                 int invalid_name = 0;
1314                 
1315                 s = talloc_strdup(ctdb, de->d_name);
1316                 if (s == NULL) {
1317                         closedir(d);
1318                         CTDB_NO_MEMORY(ctdb, s);
1319                 }
1320
1321                 /* only accept names ending in .tdb */
1322                 p = strstr(s, ".tdb.");
1323                 if (len < 7 || p == NULL) {
1324                         talloc_free(s);
1325                         continue;
1326                 }
1327
1328                 /* only accept names ending with .tdb. and any number of digits */
1329                 q = p+5;
1330                 while (*q != 0 && invalid_name == 0) {
1331                         if (!isdigit(*q++)) {
1332                                 invalid_name = 1;
1333                         }
1334                 }
1335                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1336                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1337                         talloc_free(s);
1338                         continue;
1339                 }
1340                 p[4] = 0;
1341
1342                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1343                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1344                         closedir(d);
1345                         talloc_free(s);
1346                         return -1;
1347                 }
1348
1349                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1350
1351                 talloc_free(s);
1352         }
1353         closedir(d);
1354         return 0;
1355 }
1356
1357 int ctdb_attach_databases(struct ctdb_context *ctdb)
1358 {
1359         int ret;
1360         char *persistent_health_path = NULL;
1361         char *unhealthy_reason = NULL;
1362         bool first_try = true;
1363
1364         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1365                                                  ctdb->db_directory_state,
1366                                                  PERSISTENT_HEALTH_TDB,
1367                                                  ctdb->pnn);
1368         if (persistent_health_path == NULL) {
1369                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1370                 return -1;
1371         }
1372
1373 again:
1374
1375         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1376                                                    0, TDB_DISALLOW_NESTING,
1377                                                    O_CREAT | O_RDWR, 0600);
1378         if (ctdb->db_persistent_health == NULL) {
1379                 struct tdb_wrap *tdb;
1380
1381                 if (!first_try) {
1382                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1383                                           persistent_health_path,
1384                                           errno,
1385                                           strerror(errno)));
1386                         talloc_free(persistent_health_path);
1387                         talloc_free(unhealthy_reason);
1388                         return -1;
1389                 }
1390                 first_try = false;
1391
1392                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1393                                                    persistent_health_path,
1394                                                    "was cleared after a failure",
1395                                                    "manual verification needed");
1396                 if (unhealthy_reason == NULL) {
1397                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1398                         talloc_free(persistent_health_path);
1399                         return -1;
1400                 }
1401
1402                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1403                                   persistent_health_path));
1404                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1405                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1406                                     O_CREAT | O_RDWR, 0600);
1407                 if (tdb) {
1408                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1409                                           persistent_health_path,
1410                                           errno,
1411                                           strerror(errno)));
1412                         talloc_free(persistent_health_path);
1413                         talloc_free(unhealthy_reason);
1414                         return -1;
1415                 }
1416
1417                 talloc_free(tdb);
1418                 goto again;
1419         }
1420         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1421         if (ret != 0) {
1422                 struct tdb_wrap *tdb;
1423
1424                 talloc_free(ctdb->db_persistent_health);
1425                 ctdb->db_persistent_health = NULL;
1426
1427                 if (!first_try) {
1428                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1429                                           persistent_health_path));
1430                         talloc_free(persistent_health_path);
1431                         talloc_free(unhealthy_reason);
1432                         return -1;
1433                 }
1434                 first_try = false;
1435
1436                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1437                                                    persistent_health_path,
1438                                                    "was cleared after a failure",
1439                                                    "manual verification needed");
1440                 if (unhealthy_reason == NULL) {
1441                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1442                         talloc_free(persistent_health_path);
1443                         return -1;
1444                 }
1445
1446                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1447                                   persistent_health_path));
1448                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1449                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1450                                     O_CREAT | O_RDWR, 0600);
1451                 if (tdb) {
1452                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1453                                           persistent_health_path,
1454                                           errno,
1455                                           strerror(errno)));
1456                         talloc_free(persistent_health_path);
1457                         talloc_free(unhealthy_reason);
1458                         return -1;
1459                 }
1460
1461                 talloc_free(tdb);
1462                 goto again;
1463         }
1464         talloc_free(persistent_health_path);
1465
1466         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1467         talloc_free(unhealthy_reason);
1468         if (ret != 0) {
1469                 return ret;
1470         }
1471
1472         return 0;
1473 }
1474
1475 /*
1476   called when a broadcast seqnum update comes in
1477  */
1478 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1479 {
1480         struct ctdb_db_context *ctdb_db;
1481         if (srcnode == ctdb->pnn) {
1482                 /* don't update ourselves! */
1483                 return 0;
1484         }
1485
1486         ctdb_db = find_ctdb_db(ctdb, db_id);
1487         if (!ctdb_db) {
1488                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1489                 return -1;
1490         }
1491
1492         if (ctdb_db->unhealthy_reason) {
1493                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1494                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1495                 return -1;
1496         }
1497
1498         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1499         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1500         return 0;
1501 }
1502
1503 /*
1504   timer to check for seqnum changes in a ltdb and propogate them
1505  */
1506 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1507                                    struct timeval t, void *p)
1508 {
1509         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1510         struct ctdb_context *ctdb = ctdb_db->ctdb;
1511         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1512         if (new_seqnum != ctdb_db->seqnum) {
1513                 /* something has changed - propogate it */
1514                 TDB_DATA data;
1515                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1516                 data.dsize = sizeof(uint32_t);
1517                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1518                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1519                                          data, NULL, NULL);             
1520         }
1521         ctdb_db->seqnum = new_seqnum;
1522
1523         /* setup a new timer */
1524         ctdb_db->seqnum_update =
1525                 event_add_timed(ctdb->ev, ctdb_db, 
1526                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1527                                 ctdb_ltdb_seqnum_check, ctdb_db);
1528 }
1529
1530 /*
1531   enable seqnum handling on this db
1532  */
1533 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1534 {
1535         struct ctdb_db_context *ctdb_db;
1536         ctdb_db = find_ctdb_db(ctdb, db_id);
1537         if (!ctdb_db) {
1538                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1539                 return -1;
1540         }
1541
1542         if (ctdb_db->seqnum_update == NULL) {
1543                 ctdb_db->seqnum_update =
1544                         event_add_timed(ctdb->ev, ctdb_db, 
1545                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1546                                         ctdb_ltdb_seqnum_check, ctdb_db);
1547         }
1548
1549         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1550         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1551         return 0;
1552 }
1553
1554 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1555                                      uint32_t client_id)
1556 {
1557         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1558         struct ctdb_db_context *ctdb_db;
1559
1560         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1561         if (!ctdb_db) {
1562                 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1563                         DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1564                                          db_prio->db_id));
1565                 }
1566                 return 0;
1567         }
1568
1569         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1570                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1571                 return 0;
1572         }
1573
1574         ctdb_db->priority = db_prio->priority;
1575         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1576
1577         if (client_id != 0) {
1578                 /* Broadcast the update to the rest of the cluster */
1579                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1580                                          CTDB_CONTROL_SET_DB_PRIORITY, 0,
1581                                          CTDB_CTRL_FLAG_NOREPLY, indata,
1582                                          NULL, NULL);
1583         }
1584         return 0;
1585 }
1586
1587
1588 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1589 {
1590         if (ctdb_db->sticky) {
1591                 return 0;
1592         }
1593
1594         if (ctdb_db->persistent) {
1595                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1596                 return -1;
1597         }
1598
1599         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1600
1601         ctdb_db->sticky = true;
1602
1603         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1604
1605         return 0;
1606 }
1607
1608 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1609 {
1610         struct ctdb_db_statistics *s = &ctdb_db->statistics;
1611         int i;
1612
1613         for (i=0; i<MAX_HOT_KEYS; i++) {
1614                 if (s->hot_keys[i].key.dsize > 0) {
1615                         talloc_free(s->hot_keys[i].key.dptr);
1616                 }
1617         }
1618
1619         ZERO_STRUCT(ctdb_db->statistics);
1620 }
1621
1622 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1623                                 uint32_t db_id,
1624                                 TDB_DATA *outdata)
1625 {
1626         struct ctdb_db_context *ctdb_db;
1627         struct ctdb_db_statistics *stats;
1628         int i;
1629         int len;
1630         char *ptr;
1631
1632         ctdb_db = find_ctdb_db(ctdb, db_id);
1633         if (!ctdb_db) {
1634                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1635                 return -1;
1636         }
1637
1638         len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1639         for (i = 0; i < MAX_HOT_KEYS; i++) {
1640                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1641         }
1642
1643         stats = talloc_size(outdata, len);
1644         if (stats == NULL) {
1645                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1646                 return -1;
1647         }
1648
1649         memcpy(stats, &ctdb_db->statistics,
1650                offsetof(struct ctdb_db_statistics, hot_keys_wire));
1651
1652         stats->num_hot_keys = MAX_HOT_KEYS;
1653
1654         ptr = &stats->hot_keys_wire[0];
1655         for (i = 0; i < MAX_HOT_KEYS; i++) {
1656                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1657                        ctdb_db->statistics.hot_keys[i].key.dsize);
1658                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1659         }
1660
1661         outdata->dptr  = (uint8_t *)stats;
1662         outdata->dsize = len;
1663
1664         return 0;
1665 }