ctdb-daemon: Add implementation for CTDB_CONTROL_DB_ATTACH_REPLICATED control
[nivanova/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
108                 /*
109                  * The record is not created by the client but
110                  * automatically by the ctdb_ltdb_fetch logic that
111                  * creates a record with an initial header in the
112                  * ltdb before trying to migrate the record from
113                  * the current lmaster. Keep it instead of trying
114                  * to delete the non-existing record...
115                  */
116                 keep = true;
117                 schedule_for_deletion = true;
118         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
119                 keep = true;
120         } else if (ctdb_db->ctdb->pnn == lmaster) {
121                 /*
122                  * If we are lmaster, then we usually keep the record.
123                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124                  * and the record is empty and has never been migrated
125                  * with data, then we should delete it instead of storing it.
126                  * This is part of the vacuuming process.
127                  *
128                  * The reason that we usually need to store even empty records
129                  * on the lmaster is that a client operating directly on the
130                  * lmaster (== dmaster) expects the local copy of the record to
131                  * exist after successful ctdb migrate call. If the record does
132                  * not exist, the client goes into a migrate loop and eventually
133                  * fails. So storing the empty record makes sure that we do not
134                  * need to change the client code.
135                  */
136                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
137                         keep = true;
138                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139                         keep = true;
140                 }
141         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
142                 keep = true;
143         }
144
145         if (keep) {
146                 if (ctdb_db_volatile(ctdb_db) &&
147                     (ctdb_db->ctdb->pnn == header->dmaster) &&
148                     !(header->flags & CTDB_REC_RO_FLAGS))
149                 {
150                         header->rsn++;
151
152                         if (data.dsize == 0) {
153                                 schedule_for_deletion = true;
154                         }
155                 }
156                 remove_from_delete_queue = !schedule_for_deletion;
157         }
158
159 store:
160         /*
161          * The VACUUM_MIGRATED flag is only set temporarily for
162          * the above logic when the record was retrieved by a
163          * VACUUM_MIGRATE call and should not be stored in the
164          * database.
165          *
166          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167          * and there are two cases in which the corresponding record
168          * is stored in the local database:
169          * 1. The record has been migrated with data in the past
170          *    (the MIGRATED_WITH_DATA record flag is set).
171          * 2. The record has been filled with data again since it
172          *    had been submitted in the VACUUM_FETCH message to the
173          *    lmaster.
174          * For such records it is important to not store the
175          * VACUUM_MIGRATED flag in the database.
176          */
177         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
178
179         /*
180          * Similarly, clear the AUTOMATIC flag which should not enter
181          * the local database copy since this would require client
182          * modifications to clear the flag when the client stores
183          * the record.
184          */
185         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
186
187         rec[0].dsize = hsize;
188         rec[0].dptr = (uint8_t *)header;
189
190         rec[1].dsize = data.dsize;
191         rec[1].dptr = data.dptr;
192
193         /* Databases with seqnum updates enabled only get their seqnum
194            changes when/if we modify the data */
195         if (ctdb_db->seqnum_update != NULL) {
196                 TDB_DATA old;
197                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
198
199                 if ((old.dsize == hsize + data.dsize) &&
200                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202                         seqnum_suppressed = true;
203                 }
204                 if (old.dptr != NULL) {
205                         free(old.dptr);
206                 }
207         }
208
209         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
210                             ctdb_db->db_name,
211                             keep?"storing":"deleting",
212                             ctdb_hash(&key)));
213
214         if (keep) {
215                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
216         } else {
217                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
218         }
219
220         if (ret != 0) {
221                 int lvl = DEBUG_ERR;
222
223                 if (keep == false &&
224                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
225                 {
226                         lvl = DEBUG_DEBUG;
227                 }
228
229                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
230                             "%d - %s\n",
231                             ctdb_db->db_name,
232                             keep?"store":"delete", ret,
233                             tdb_errorstr(ctdb_db->ltdb->tdb)));
234
235                 schedule_for_deletion = false;
236                 remove_from_delete_queue = false;
237         }
238         if (seqnum_suppressed) {
239                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
240         }
241
242         if (schedule_for_deletion) {
243                 int ret2;
244                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
245                 if (ret2 != 0) {
246                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247                 }
248         }
249
250         if (remove_from_delete_queue) {
251                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
252         }
253
254         return ret;
255 }
256
257 struct lock_fetch_state {
258         struct ctdb_context *ctdb;
259         struct ctdb_db_context *ctdb_db;
260         void (*recv_pkt)(void *, struct ctdb_req_header *);
261         void *recv_context;
262         struct ctdb_req_header *hdr;
263         uint32_t generation;
264         bool ignore_generation;
265 };
266
267 /*
268   called when we should retry the operation
269  */
270 static void lock_fetch_callback(void *p, bool locked)
271 {
272         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273         if (!state->ignore_generation &&
274             state->generation != state->ctdb_db->generation) {
275                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276                 talloc_free(state->hdr);
277                 return;
278         }
279         state->recv_pkt(state->recv_context, state->hdr);
280         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
281 }
282
283
284 /*
285   do a non-blocking ltdb_lock, deferring this ctdb request until we
286   have the chainlock
287
288   It does the following:
289
290    1) tries to get the chainlock. If it succeeds, then it returns 0
291
292    2) if it fails to get a chainlock immediately then it sets up a
293    non-blocking chainlock via ctdb_lock_record, and when it gets the
294    chainlock it re-submits this ctdb request to the main packet
295    receive function.
296
297    This effectively queues all ctdb requests that cannot be
298    immediately satisfied until it can get the lock. This means that
299    the main ctdb daemon will not block waiting for a chainlock held by
300    a client
301
302    There are 3 possible return values:
303
304        0:    means that it got the lock immediately.
305       -1:    means that it failed to get the lock, and won't retry
306       -2:    means that it failed to get the lock immediately, but will retry
307  */
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
309                            TDB_DATA key, struct ctdb_req_header *hdr,
310                            void (*recv_pkt)(void *, struct ctdb_req_header *),
311                            void *recv_context, bool ignore_generation)
312 {
313         int ret;
314         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315         struct lock_request *lreq;
316         struct lock_fetch_state *state;
317         
318         ret = tdb_chainlock_nonblock(tdb, key);
319
320         if (ret != 0 &&
321             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322                 /* a hard failure - don't try again */
323                 return -1;
324         }
325
326         /* when torturing, ensure we test the contended path */
327         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
328             random() % 5 == 0) {
329                 ret = -1;
330                 tdb_chainunlock(tdb, key);
331         }
332
333         /* first the non-contended path */
334         if (ret == 0) {
335                 return 0;
336         }
337
338         state = talloc(hdr, struct lock_fetch_state);
339         state->ctdb = ctdb_db->ctdb;
340         state->ctdb_db = ctdb_db;
341         state->hdr = hdr;
342         state->recv_pkt = recv_pkt;
343         state->recv_context = recv_context;
344         state->generation = ctdb_db->generation;
345         state->ignore_generation = ignore_generation;
346
347         /* now the contended path */
348         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
349         if (lreq == NULL) {
350                 return -1;
351         }
352
353         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354            so it won't be freed yet */
355         talloc_steal(state, hdr);
356
357         /* now tell the caller than we will retry asynchronously */
358         return -2;
359 }
360
361 /*
362   a varient of ctdb_ltdb_lock_requeue that also fetches the record
363  */
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
365                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
366                                  struct ctdb_req_header *hdr, TDB_DATA *data,
367                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
368                                  void *recv_context, bool ignore_generation)
369 {
370         int ret;
371
372         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
373                                      recv_context, ignore_generation);
374         if (ret == 0) {
375                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
376                 if (ret != 0) {
377                         int uret;
378                         uret = ctdb_ltdb_unlock(ctdb_db, key);
379                         if (uret != 0) {
380                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
381                         }
382                 }
383         }
384         return ret;
385 }
386
387
388 /*
389   paraoid check to see if the db is empty
390  */
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
392 {
393         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394         int count = tdb_traverse_read(tdb, NULL, NULL);
395         if (count != 0) {
396                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
397                          ctdb_db->db_path));
398                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
399         }
400 }
401
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403                                 struct ctdb_db_context *ctdb_db)
404 {
405         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
406         char *old;
407         char *reason = NULL;
408         TDB_DATA key;
409         TDB_DATA val;
410
411         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412         key.dsize = strlen(ctdb_db->db_name);
413
414         old = ctdb_db->unhealthy_reason;
415         ctdb_db->unhealthy_reason = NULL;
416
417         val = tdb_fetch(tdb, key);
418         if (val.dsize > 0) {
419                 reason = talloc_strndup(ctdb_db,
420                                         (const char *)val.dptr,
421                                         val.dsize);
422                 if (reason == NULL) {
423                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
424                                            (int)val.dsize));
425                         ctdb_db->unhealthy_reason = old;
426                         free(val.dptr);
427                         return -1;
428                 }
429         }
430
431         if (val.dptr) {
432                 free(val.dptr);
433         }
434
435         talloc_free(old);
436         ctdb_db->unhealthy_reason = reason;
437         return 0;
438 }
439
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441                                   struct ctdb_db_context *ctdb_db,
442                                   const char *given_reason,/* NULL means healthy */
443                                   int num_healthy_nodes)
444 {
445         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
446         int ret;
447         TDB_DATA key;
448         TDB_DATA val;
449         char *new_reason = NULL;
450         char *old_reason = NULL;
451
452         ret = tdb_transaction_start(tdb);
453         if (ret != 0) {
454                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
456                 return -1;
457         }
458
459         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
460         if (ret != 0) {
461                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462                                    ctdb_db->db_name, ret));
463                 return -1;
464         }
465         old_reason = ctdb_db->unhealthy_reason;
466
467         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468         key.dsize = strlen(ctdb_db->db_name);
469
470         if (given_reason) {
471                 new_reason = talloc_strdup(ctdb_db, given_reason);
472                 if (new_reason == NULL) {
473                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
474                                           given_reason));
475                         return -1;
476                 }
477         } else if (old_reason && num_healthy_nodes == 0) {
478                 /*
479                  * If the reason indicates ok, but there where no healthy nodes
480                  * available, that it means, we have not recovered valid content
481                  * of the db. So if there's an old reason, prefix it with
482                  * "NO-HEALTHY-NODES - "
483                  */
484                 const char *prefix;
485
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
488                 if (ret != 0) {
489                         prefix = _TMP_PREFIX;
490                 } else {
491                         prefix = "";
492                 }
493                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
494                                          prefix, old_reason);
495                 if (new_reason == NULL) {
496                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497                                           prefix, old_reason));
498                         return -1;
499                 }
500 #undef _TMP_PREFIX
501         }
502
503         if (new_reason) {
504                 val.dptr = discard_const_p(uint8_t, new_reason);
505                 val.dsize = strlen(new_reason);
506
507                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
508                 if (ret != 0) {
509                         tdb_transaction_cancel(tdb);
510                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
512                                            ret, tdb_errorstr(tdb)));
513                         talloc_free(new_reason);
514                         return -1;
515                 }
516                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517                                    ctdb_db->db_name, new_reason));
518         } else if (old_reason) {
519                 ret = tdb_delete(tdb, key);
520                 if (ret != 0) {
521                         tdb_transaction_cancel(tdb);
522                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523                                            tdb_name(tdb), ctdb_db->db_name,
524                                            ret, tdb_errorstr(tdb)));
525                         talloc_free(new_reason);
526                         return -1;
527                 }
528                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
529                                    ctdb_db->db_name));
530         }
531
532         ret = tdb_transaction_commit(tdb);
533         if (ret != TDB_SUCCESS) {
534                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
536                 talloc_free(new_reason);
537                 return -1;
538         }
539
540         talloc_free(old_reason);
541         ctdb_db->unhealthy_reason = new_reason;
542
543         return 0;
544 }
545
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547                                      struct ctdb_db_context *ctdb_db)
548 {
549         time_t now = time(NULL);
550         char *new_path;
551         char *new_reason;
552         int ret;
553         struct tm *tm;
554
555         tm = gmtime(&now);
556
557         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559                                    "%04u%02u%02u%02u%02u%02u.0Z",
560                                    ctdb_db->db_path,
561                                    tm->tm_year+1900, tm->tm_mon+1,
562                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
563                                    tm->tm_sec);
564         if (new_path == NULL) {
565                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
566                 return -1;
567         }
568
569         new_reason = talloc_asprintf(ctdb_db,
570                                      "ERROR - Backup of corrupted TDB in '%s'",
571                                      new_path);
572         if (new_reason == NULL) {
573                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574                 return -1;
575         }
576         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577         talloc_free(new_reason);
578         if (ret != 0) {
579                 DEBUG(DEBUG_CRIT,(__location__
580                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
581                                  ctdb_db->db_path));
582                 return -1;
583         }
584
585         ret = rename(ctdb_db->db_path, new_path);
586         if (ret != 0) {
587                 DEBUG(DEBUG_CRIT,(__location__
588                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589                                   ctdb_db->db_path, new_path,
590                                   errno, strerror(errno)));
591                 talloc_free(new_path);
592                 return -1;
593         }
594
595         DEBUG(DEBUG_CRIT,(__location__
596                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597                          ctdb_db->db_path, new_path));
598         talloc_free(new_path);
599         return 0;
600 }
601
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
603 {
604         struct ctdb_db_context *ctdb_db;
605         int ret;
606         int ok = 0;
607         int fail = 0;
608
609         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610                 if (!ctdb_db_persistent(ctdb_db)) {
611                         continue;
612                 }
613
614                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
615                 if (ret != 0) {
616                         DEBUG(DEBUG_ALERT,(__location__
617                                            " load persistent health for '%s' failed\n",
618                                            ctdb_db->db_path));
619                         return -1;
620                 }
621
622                 if (ctdb_db->unhealthy_reason == NULL) {
623                         ok++;
624                         DEBUG(DEBUG_INFO,(__location__
625                                    " persistent db '%s' healthy\n",
626                                    ctdb_db->db_path));
627                         continue;
628                 }
629
630                 fail++;
631                 DEBUG(DEBUG_ALERT,(__location__
632                                    " persistent db '%s' unhealthy: %s\n",
633                                    ctdb_db->db_path,
634                                    ctdb_db->unhealthy_reason));
635         }
636         DEBUG(DEBUG_NOTICE,
637               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
638                ok, fail));
639
640         if (fail != 0) {
641                 return -1;
642         }
643
644         return 0;
645 }
646
647
648 /*
649   mark a database - as healthy
650  */
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
652 {
653         uint32_t db_id = *(uint32_t *)indata.dptr;
654         struct ctdb_db_context *ctdb_db;
655         int ret;
656         bool may_recover = false;
657
658         ctdb_db = find_ctdb_db(ctdb, db_id);
659         if (!ctdb_db) {
660                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
661                 return -1;
662         }
663
664         if (ctdb_db->unhealthy_reason) {
665                 may_recover = true;
666         }
667
668         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR,(__location__
671                                  " ctdb_update_persistent_health(%s) failed\n",
672                                  ctdb_db->db_name));
673                 return -1;
674         }
675
676         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
678                                   ctdb_db->db_name));
679                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
680         }
681
682         return 0;
683 }
684
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
686                                    TDB_DATA indata,
687                                    TDB_DATA *outdata)
688 {
689         uint32_t db_id = *(uint32_t *)indata.dptr;
690         struct ctdb_db_context *ctdb_db;
691         int ret;
692
693         ctdb_db = find_ctdb_db(ctdb, db_id);
694         if (!ctdb_db) {
695                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
696                 return -1;
697         }
698
699         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
700         if (ret != 0) {
701                 DEBUG(DEBUG_ERR,(__location__
702                                  " ctdb_load_persistent_health(%s) failed\n",
703                                  ctdb_db->db_name));
704                 return -1;
705         }
706
707         *outdata = tdb_null;
708         if (ctdb_db->unhealthy_reason) {
709                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
711         }
712
713         return 0;
714 }
715
716
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
718 {
719         char *ropath;
720
721         if (ctdb_db_readonly(ctdb_db)) {
722                 return 0;
723         }
724
725         if (! ctdb_db_volatile(ctdb_db)) {
726                 DEBUG(DEBUG_ERR,
727                       ("Non-volatile databases do not support readonly flag\n"));
728                 return -1;
729         }
730
731         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
732         if (ropath == NULL) {
733                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
734                 return -1;
735         }
736         ctdb_db->rottdb = tdb_open(ropath, 
737                               ctdb->tunable.database_hash_size, 
738                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
739                               O_CREAT|O_RDWR, 0600);
740         if (ctdb_db->rottdb == NULL) {
741                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
742                 talloc_free(ropath);
743                 return -1;
744         }
745
746         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
747
748         ctdb_db_set_readonly(ctdb_db);
749
750         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
751
752         talloc_free(ropath);
753         return 0;
754 }
755
756 /*
757   attach to a database, handling both persistent and non-persistent databases
758   return 0 on success, -1 on failure
759  */
760 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
761                              uint8_t db_flags, const char *unhealthy_reason)
762 {
763         struct ctdb_db_context *ctdb_db, *tmp_db;
764         int ret;
765         struct TDB_DATA key;
766         int tdb_flags;
767         int mode = 0600;
768         int remaining_tries = 0;
769
770         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771         CTDB_NO_MEMORY(ctdb, ctdb_db);
772
773         ctdb_db->ctdb = ctdb;
774         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
776
777         key.dsize = strlen(db_name)+1;
778         key.dptr  = discard_const(db_name);
779         ctdb_db->db_id = ctdb_hash(&key);
780         ctdb_db->db_flags = db_flags;
781
782         if (ctdb_db_volatile(ctdb_db)) {
783                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784                 if (ctdb_db->delete_queue == NULL) {
785                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
786                 }
787
788                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
789         }
790
791         /* check for hash collisions */
792         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793                 if (tmp_db->db_id == ctdb_db->db_id) {
794                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795                                  tmp_db->db_id, db_name, tmp_db->db_name));
796                         talloc_free(ctdb_db);
797                         return -1;
798                 }
799         }
800
801         if (ctdb_db_persistent(ctdb_db)) {
802                 if (unhealthy_reason) {
803                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804                                                             unhealthy_reason, 0);
805                         if (ret != 0) {
806                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807                                                    ctdb_db->db_name, unhealthy_reason, ret));
808                                 talloc_free(ctdb_db);
809                                 return -1;
810                         }
811                 }
812
813                 if (ctdb->max_persistent_check_errors > 0) {
814                         remaining_tries = 1;
815                 }
816                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
817                         remaining_tries = 0;
818                 }
819
820                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
821                 if (ret != 0) {
822                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823                                    ctdb_db->db_name, ret));
824                         talloc_free(ctdb_db);
825                         return -1;
826                 }
827         }
828
829         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
832                 talloc_free(ctdb_db);
833                 return -1;
834         }
835
836         if (ctdb_db->unhealthy_reason) {
837                 /* this is just a warning, but we want that in the log file! */
838                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
840         }
841
842         /* open the database */
843         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
844                                            ctdb_db_persistent(ctdb_db) ?
845                                                 ctdb->db_directory_persistent :
846                                                 ctdb->db_directory,
847                                            db_name, ctdb->pnn);
848
849         tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
850                                       ctdb->tunable.mutex_enabled);
851
852 again:
853         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
854                                       ctdb->tunable.database_hash_size, 
855                                       tdb_flags, 
856                                       O_CREAT|O_RDWR, mode);
857         if (ctdb_db->ltdb == NULL) {
858                 struct stat st;
859                 int saved_errno = errno;
860
861                 if (! ctdb_db_persistent(ctdb_db)) {
862                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
863                                           ctdb_db->db_path,
864                                           saved_errno,
865                                           strerror(saved_errno)));
866                         talloc_free(ctdb_db);
867                         return -1;
868                 }
869
870                 if (remaining_tries == 0) {
871                         DEBUG(DEBUG_CRIT,(__location__
872                                           "Failed to open persistent tdb '%s': %d - %s\n",
873                                           ctdb_db->db_path,
874                                           saved_errno,
875                                           strerror(saved_errno)));
876                         talloc_free(ctdb_db);
877                         return -1;
878                 }
879
880                 ret = stat(ctdb_db->db_path, &st);
881                 if (ret != 0) {
882                         DEBUG(DEBUG_CRIT,(__location__
883                                           "Failed to open persistent tdb '%s': %d - %s\n",
884                                           ctdb_db->db_path,
885                                           saved_errno,
886                                           strerror(saved_errno)));
887                         talloc_free(ctdb_db);
888                         return -1;
889                 }
890
891                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
892                 if (ret != 0) {
893                         DEBUG(DEBUG_CRIT,(__location__
894                                           "Failed to open persistent tdb '%s': %d - %s\n",
895                                           ctdb_db->db_path,
896                                           saved_errno,
897                                           strerror(saved_errno)));
898                         talloc_free(ctdb_db);
899                         return -1;
900                 }
901
902                 remaining_tries--;
903                 mode = st.st_mode;
904                 goto again;
905         }
906
907         if (!ctdb_db_persistent(ctdb_db)) {
908                 ctdb_check_db_empty(ctdb_db);
909         } else {
910                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
911                 if (ret != 0) {
912                         int fd;
913                         struct stat st;
914
915                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
916                                           ctdb_db->db_path, ret,
917                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
918                         if (remaining_tries == 0) {
919                                 talloc_free(ctdb_db);
920                                 return -1;
921                         }
922
923                         fd = tdb_fd(ctdb_db->ltdb->tdb);
924                         ret = fstat(fd, &st);
925                         if (ret != 0) {
926                                 DEBUG(DEBUG_CRIT,(__location__
927                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
928                                                   ctdb_db->db_path,
929                                                   errno,
930                                                   strerror(errno)));
931                                 talloc_free(ctdb_db);
932                                 return -1;
933                         }
934
935                         /* close the TDB */
936                         talloc_free(ctdb_db->ltdb);
937                         ctdb_db->ltdb = NULL;
938
939                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
940                         if (ret != 0) {
941                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
942                                                   ctdb_db->db_path));
943                                 talloc_free(ctdb_db);
944                                 return -1;
945                         }
946
947                         remaining_tries--;
948                         mode = st.st_mode;
949                         goto again;
950                 }
951         }
952
953         /* remember the flags the client has specified */
954         tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
955
956
957         /* set up a rb tree we can use to track which records we have a 
958            fetch-lock in-flight for so we can defer any additional calls
959            for the same record.
960          */
961         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
962         if (ctdb_db->deferred_fetch == NULL) {
963                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
964                 talloc_free(ctdb_db);
965                 return -1;
966         }
967
968         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
969         if (ctdb_db->defer_dmaster == NULL) {
970                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
971                                   ctdb_db->db_name));
972                 talloc_free(ctdb_db);
973                 return -1;
974         }
975
976         DLIST_ADD(ctdb->db_list, ctdb_db);
977
978         /* setting this can help some high churn databases */
979         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
980
981         /* 
982            all databases support the "null" function. we need this in
983            order to do forced migration of records
984         */
985         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
986         if (ret != 0) {
987                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
988                 talloc_free(ctdb_db);
989                 return -1;
990         }
991
992         /* 
993            all databases support the "fetch" function. we need this
994            for efficient Samba3 ctdb fetch
995         */
996         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
997         if (ret != 0) {
998                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
999                 talloc_free(ctdb_db);
1000                 return -1;
1001         }
1002
1003         /* 
1004            all databases support the "fetch_with_header" function. we need this
1005            for efficient readonly record fetches
1006         */
1007         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1008         if (ret != 0) {
1009                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1010                 talloc_free(ctdb_db);
1011                 return -1;
1012         }
1013
1014         ret = ctdb_vacuum_init(ctdb_db);
1015         if (ret != 0) {
1016                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1017                                   "database '%s'\n", ctdb_db->db_name));
1018                 talloc_free(ctdb_db);
1019                 return -1;
1020         }
1021
1022         ret = ctdb_migration_init(ctdb_db);
1023         if (ret != 0) {
1024                 DEBUG(DEBUG_ERR,
1025                       ("Failed to setup migration tracking for db '%s'\n",
1026                        ctdb_db->db_name));
1027                 talloc_free(ctdb_db);
1028                 return -1;
1029         }
1030
1031         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1032                            &ctdb_db->lock_log);
1033         if (ret != 0) {
1034                 DEBUG(DEBUG_ERR,
1035                       ("Failed to setup lock logging for db '%s'\n",
1036                        ctdb_db->db_name));
1037                 talloc_free(ctdb_db);
1038                 return -1;
1039         }
1040
1041         ctdb_db->generation = ctdb->vnn_map->generation;
1042
1043         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1044                             ctdb_db->db_path, tdb_flags));
1045
1046         /* success */
1047         return 0;
1048 }
1049
1050
1051 struct ctdb_deferred_attach_context {
1052         struct ctdb_deferred_attach_context *next, *prev;
1053         struct ctdb_context *ctdb;
1054         struct ctdb_req_control_old *c;
1055 };
1056
1057
1058 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1059 {
1060         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1061
1062         return 0;
1063 }
1064
1065 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1066                                          struct tevent_timer *te,
1067                                          struct timeval t, void *private_data)
1068 {
1069         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1070         struct ctdb_context *ctdb = da_ctx->ctdb;
1071
1072         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1073         talloc_free(da_ctx);
1074 }
1075
1076 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1077                                           struct tevent_timer *te,
1078                                           struct timeval t, void *private_data)
1079 {
1080         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1081         struct ctdb_context *ctdb = da_ctx->ctdb;
1082
1083         /* This talloc-steals the packet ->c */
1084         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1085         talloc_free(da_ctx);
1086 }
1087
1088 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1089 {
1090         struct ctdb_deferred_attach_context *da_ctx;
1091
1092         /* call it from the main event loop as soon as the current event 
1093            finishes.
1094          */
1095         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1096                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1097                 tevent_add_timer(ctdb->ev, da_ctx,
1098                                  timeval_current_ofs(1,0),
1099                                  ctdb_deferred_attach_callback, da_ctx);
1100         }
1101
1102         return 0;
1103 }
1104
1105 /*
1106   a client has asked to attach a new database
1107  */
1108 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1109                                TDB_DATA *outdata,
1110                                uint8_t db_flags, uint32_t client_id,
1111                                struct ctdb_req_control_old *c,
1112                                bool *async_reply)
1113 {
1114         const char *db_name = (const char *)indata.dptr;
1115         struct ctdb_db_context *db;
1116         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1117         struct ctdb_client *client = NULL;
1118         uint32_t opcode;
1119
1120         if (ctdb->tunable.allow_client_db_attach == 0) {
1121                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1122                                   "AllowClientDBAccess == 0\n", db_name));
1123                 return -1;
1124         }
1125
1126         /* don't allow any local clients to attach while we are in recovery mode
1127          * except for the recovery daemon.
1128          * allow all attach from the network since these are always from remote
1129          * recovery daemons.
1130          */
1131         if (client_id != 0) {
1132                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1133         }
1134         if (client != NULL) {
1135                 /* If the node is inactive it is not part of the cluster
1136                    and we should not allow clients to attach to any
1137                    databases
1138                 */
1139                 if (node->flags & NODE_FLAGS_INACTIVE) {
1140                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1141                         return -1;
1142                 }
1143
1144                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1145                     client->pid != ctdb->recoverd_pid &&
1146                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1147                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1148
1149                         if (da_ctx == NULL) {
1150                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1151                                 return -1;
1152                         }
1153
1154                         da_ctx->ctdb = ctdb;
1155                         da_ctx->c = talloc_steal(da_ctx, c);
1156                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1157                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1158
1159                         tevent_add_timer(ctdb->ev, da_ctx,
1160                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1161                                          ctdb_deferred_attach_timeout, da_ctx);
1162
1163                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1164                         *async_reply = true;
1165                         return 0;
1166                 }
1167         }
1168
1169         /* see if we already have this name */
1170         db = ctdb_db_handle(ctdb, db_name);
1171         if (db) {
1172                 if ((db->db_flags & db_flags) != db_flags) {
1173                         DEBUG(DEBUG_ERR,
1174                               ("Error: Failed to re-attach with 0x%x flags,"
1175                                " database has 0x%x flags\n", db_flags,
1176                                db->db_flags));
1177                         return -1;
1178                 }
1179                 outdata->dptr  = (uint8_t *)&db->db_id;
1180                 outdata->dsize = sizeof(db->db_id);
1181                 return 0;
1182         }
1183
1184         if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1185                 return -1;
1186         }
1187
1188         db = ctdb_db_handle(ctdb, db_name);
1189         if (!db) {
1190                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1191                 return -1;
1192         }
1193
1194         outdata->dptr  = (uint8_t *)&db->db_id;
1195         outdata->dsize = sizeof(db->db_id);
1196
1197         /* Try to ensure it's locked in mem */
1198         lockdown_memory(ctdb->valgrinding);
1199
1200         if (ctdb_db_persistent(db)) {
1201                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1202         } else if (ctdb_db_replicated(db)) {
1203                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1204         } else {
1205                 opcode = CTDB_CONTROL_DB_ATTACH;
1206         }
1207
1208         /* tell all the other nodes about this database */
1209         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0, opcode,
1210                                  0, CTDB_CTRL_FLAG_NOREPLY,
1211                                  indata, NULL, NULL);
1212
1213         /* success */
1214         return 0;
1215 }
1216
1217 /*
1218  * a client has asked to detach from a database
1219  */
1220 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1221                                uint32_t client_id)
1222 {
1223         uint32_t db_id;
1224         struct ctdb_db_context *ctdb_db;
1225         struct ctdb_client *client = NULL;
1226
1227         db_id = *(uint32_t *)indata.dptr;
1228         ctdb_db = find_ctdb_db(ctdb, db_id);
1229         if (ctdb_db == NULL) {
1230                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1231                                   db_id));
1232                 return -1;
1233         }
1234
1235         if (ctdb->tunable.allow_client_db_attach == 1) {
1236                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1237                                   "Clients are allowed access to databases "
1238                                   "(AllowClientDBAccess == 1)\n",
1239                                   ctdb_db->db_name));
1240                 return -1;
1241         }
1242
1243         if (! ctdb_db_volatile(ctdb_db)) {
1244                 DEBUG(DEBUG_ERR,
1245                       ("Detaching non-volatile database %s denied\n",
1246                        ctdb_db->db_name));
1247                 return -1;
1248         }
1249
1250         /* Cannot detach from database when in recovery */
1251         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1252                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1253                 return -1;
1254         }
1255
1256         /* If a control comes from a client, then broadcast it to all nodes.
1257          * Do the actual detach only if the control comes from other daemons.
1258          */
1259         if (client_id != 0) {
1260                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1261                 if (client != NULL) {
1262                         /* forward the control to all the nodes */
1263                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1264                                                  CTDB_CONTROL_DB_DETACH, 0,
1265                                                  CTDB_CTRL_FLAG_NOREPLY,
1266                                                  indata, NULL, NULL);
1267                         return 0;
1268                 }
1269                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1270                                   "for database '%s'\n", ctdb_db->db_name));
1271                 return -1;
1272         }
1273
1274         /* Detach database from recoverd */
1275         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1276                                      CTDB_SRVID_DETACH_DATABASE,
1277                                      indata) != 0) {
1278                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1279                 return -1;
1280         }
1281
1282         /* Disable vacuuming and drop all vacuuming data */
1283         talloc_free(ctdb_db->vacuum_handle);
1284         talloc_free(ctdb_db->delete_queue);
1285
1286         /* Terminate any deferred fetch */
1287         talloc_free(ctdb_db->deferred_fetch);
1288
1289         /* Terminate any traverses */
1290         while (ctdb_db->traverse) {
1291                 talloc_free(ctdb_db->traverse);
1292         }
1293
1294         /* Terminate any revokes */
1295         while (ctdb_db->revokechild_active) {
1296                 talloc_free(ctdb_db->revokechild_active);
1297         }
1298
1299         /* Free readonly tracking database */
1300         if (ctdb_db_readonly(ctdb_db)) {
1301                 talloc_free(ctdb_db->rottdb);
1302         }
1303
1304         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1305
1306         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1307                              ctdb_db->db_name));
1308         talloc_free(ctdb_db);
1309
1310         return 0;
1311 }
1312
1313 /*
1314   attach to all existing persistent databases
1315  */
1316 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1317                                   const char *unhealthy_reason)
1318 {
1319         DIR *d;
1320         struct dirent *de;
1321
1322         /* open the persistent db directory and scan it for files */
1323         d = opendir(ctdb->db_directory_persistent);
1324         if (d == NULL) {
1325                 return 0;
1326         }
1327
1328         while ((de=readdir(d))) {
1329                 char *p, *s, *q;
1330                 size_t len = strlen(de->d_name);
1331                 uint32_t node;
1332                 int invalid_name = 0;
1333                 
1334                 s = talloc_strdup(ctdb, de->d_name);
1335                 if (s == NULL) {
1336                         closedir(d);
1337                         CTDB_NO_MEMORY(ctdb, s);
1338                 }
1339
1340                 /* only accept names ending in .tdb */
1341                 p = strstr(s, ".tdb.");
1342                 if (len < 7 || p == NULL) {
1343                         talloc_free(s);
1344                         continue;
1345                 }
1346
1347                 /* only accept names ending with .tdb. and any number of digits */
1348                 q = p+5;
1349                 while (*q != 0 && invalid_name == 0) {
1350                         if (!isdigit(*q++)) {
1351                                 invalid_name = 1;
1352                         }
1353                 }
1354                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1355                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1356                         talloc_free(s);
1357                         continue;
1358                 }
1359                 p[4] = 0;
1360
1361                 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1362                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1363                         closedir(d);
1364                         talloc_free(s);
1365                         return -1;
1366                 }
1367
1368                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1369
1370                 talloc_free(s);
1371         }
1372         closedir(d);
1373         return 0;
1374 }
1375
1376 int ctdb_attach_databases(struct ctdb_context *ctdb)
1377 {
1378         int ret;
1379         char *persistent_health_path = NULL;
1380         char *unhealthy_reason = NULL;
1381         bool first_try = true;
1382
1383         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1384                                                  ctdb->db_directory_state,
1385                                                  PERSISTENT_HEALTH_TDB,
1386                                                  ctdb->pnn);
1387         if (persistent_health_path == NULL) {
1388                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1389                 return -1;
1390         }
1391
1392 again:
1393
1394         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1395                                                    0, TDB_DISALLOW_NESTING,
1396                                                    O_CREAT | O_RDWR, 0600);
1397         if (ctdb->db_persistent_health == NULL) {
1398                 struct tdb_wrap *tdb;
1399
1400                 if (!first_try) {
1401                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1402                                           persistent_health_path,
1403                                           errno,
1404                                           strerror(errno)));
1405                         talloc_free(persistent_health_path);
1406                         talloc_free(unhealthy_reason);
1407                         return -1;
1408                 }
1409                 first_try = false;
1410
1411                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1412                                                    persistent_health_path,
1413                                                    "was cleared after a failure",
1414                                                    "manual verification needed");
1415                 if (unhealthy_reason == NULL) {
1416                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1417                         talloc_free(persistent_health_path);
1418                         return -1;
1419                 }
1420
1421                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1422                                   persistent_health_path));
1423                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1424                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1425                                     O_CREAT | O_RDWR, 0600);
1426                 if (tdb) {
1427                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1428                                           persistent_health_path,
1429                                           errno,
1430                                           strerror(errno)));
1431                         talloc_free(persistent_health_path);
1432                         talloc_free(unhealthy_reason);
1433                         return -1;
1434                 }
1435
1436                 talloc_free(tdb);
1437                 goto again;
1438         }
1439         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1440         if (ret != 0) {
1441                 struct tdb_wrap *tdb;
1442
1443                 talloc_free(ctdb->db_persistent_health);
1444                 ctdb->db_persistent_health = NULL;
1445
1446                 if (!first_try) {
1447                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1448                                           persistent_health_path));
1449                         talloc_free(persistent_health_path);
1450                         talloc_free(unhealthy_reason);
1451                         return -1;
1452                 }
1453                 first_try = false;
1454
1455                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1456                                                    persistent_health_path,
1457                                                    "was cleared after a failure",
1458                                                    "manual verification needed");
1459                 if (unhealthy_reason == NULL) {
1460                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1461                         talloc_free(persistent_health_path);
1462                         return -1;
1463                 }
1464
1465                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1466                                   persistent_health_path));
1467                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1468                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1469                                     O_CREAT | O_RDWR, 0600);
1470                 if (tdb) {
1471                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1472                                           persistent_health_path,
1473                                           errno,
1474                                           strerror(errno)));
1475                         talloc_free(persistent_health_path);
1476                         talloc_free(unhealthy_reason);
1477                         return -1;
1478                 }
1479
1480                 talloc_free(tdb);
1481                 goto again;
1482         }
1483         talloc_free(persistent_health_path);
1484
1485         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1486         talloc_free(unhealthy_reason);
1487         if (ret != 0) {
1488                 return ret;
1489         }
1490
1491         return 0;
1492 }
1493
1494 /*
1495   called when a broadcast seqnum update comes in
1496  */
1497 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1498 {
1499         struct ctdb_db_context *ctdb_db;
1500         if (srcnode == ctdb->pnn) {
1501                 /* don't update ourselves! */
1502                 return 0;
1503         }
1504
1505         ctdb_db = find_ctdb_db(ctdb, db_id);
1506         if (!ctdb_db) {
1507                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1508                 return -1;
1509         }
1510
1511         if (ctdb_db->unhealthy_reason) {
1512                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1513                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1514                 return -1;
1515         }
1516
1517         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1518         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1519         return 0;
1520 }
1521
1522 /*
1523   timer to check for seqnum changes in a ltdb and propogate them
1524  */
1525 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1526                                    struct tevent_timer *te,
1527                                    struct timeval t, void *p)
1528 {
1529         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1530         struct ctdb_context *ctdb = ctdb_db->ctdb;
1531         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1532         if (new_seqnum != ctdb_db->seqnum) {
1533                 /* something has changed - propogate it */
1534                 TDB_DATA data;
1535                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1536                 data.dsize = sizeof(uint32_t);
1537                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1538                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1539                                          data, NULL, NULL);             
1540         }
1541         ctdb_db->seqnum = new_seqnum;
1542
1543         /* setup a new timer */
1544         ctdb_db->seqnum_update =
1545                 tevent_add_timer(ctdb->ev, ctdb_db,
1546                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1547                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1548                                  ctdb_ltdb_seqnum_check, ctdb_db);
1549 }
1550
1551 /*
1552   enable seqnum handling on this db
1553  */
1554 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1555 {
1556         struct ctdb_db_context *ctdb_db;
1557         ctdb_db = find_ctdb_db(ctdb, db_id);
1558         if (!ctdb_db) {
1559                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1560                 return -1;
1561         }
1562
1563         if (ctdb_db->seqnum_update == NULL) {
1564                 ctdb_db->seqnum_update = tevent_add_timer(
1565                         ctdb->ev, ctdb_db,
1566                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1567                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1568                         ctdb_ltdb_seqnum_check, ctdb_db);
1569         }
1570
1571         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1572         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1573         return 0;
1574 }
1575
1576 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1577 {
1578         if (ctdb_db_sticky(ctdb_db)) {
1579                 return 0;
1580         }
1581
1582         if (! ctdb_db_volatile(ctdb_db)) {
1583                 DEBUG(DEBUG_ERR,
1584                       ("Non-volatile databases do not support sticky flag\n"));
1585                 return -1;
1586         }
1587
1588         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1589
1590         ctdb_db_set_sticky(ctdb_db);
1591
1592         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1593
1594         return 0;
1595 }
1596
1597 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1598 {
1599         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1600         int i;
1601
1602         for (i=0; i<MAX_HOT_KEYS; i++) {
1603                 if (s->hot_keys[i].key.dsize > 0) {
1604                         talloc_free(s->hot_keys[i].key.dptr);
1605                 }
1606         }
1607
1608         ZERO_STRUCT(ctdb_db->statistics);
1609 }
1610
1611 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1612                                 uint32_t db_id,
1613                                 TDB_DATA *outdata)
1614 {
1615         struct ctdb_db_context *ctdb_db;
1616         struct ctdb_db_statistics_old *stats;
1617         int i;
1618         int len;
1619         char *ptr;
1620
1621         ctdb_db = find_ctdb_db(ctdb, db_id);
1622         if (!ctdb_db) {
1623                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1624                 return -1;
1625         }
1626
1627         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1628         for (i = 0; i < MAX_HOT_KEYS; i++) {
1629                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1630         }
1631
1632         stats = talloc_size(outdata, len);
1633         if (stats == NULL) {
1634                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1635                 return -1;
1636         }
1637
1638         memcpy(stats, &ctdb_db->statistics,
1639                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1640
1641         stats->num_hot_keys = MAX_HOT_KEYS;
1642
1643         ptr = &stats->hot_keys_wire[0];
1644         for (i = 0; i < MAX_HOT_KEYS; i++) {
1645                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1646                        ctdb_db->statistics.hot_keys[i].key.dsize);
1647                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1648         }
1649
1650         outdata->dptr  = (uint8_t *)stats;
1651         outdata->dsize = len;
1652
1653         return 0;
1654 }