ctdb-protocol: Fix typo in type of return variable
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
45
46 /**
47  * write a record to a normal database
48  *
49  * This is the server-variant of the ctdb_ltdb_store function.
50  * It contains logic to determine whether a record should be
51  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52  * controls to the local ctdb daemon if apporpriate.
53  */
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
55                                   TDB_DATA key,
56                                   struct ctdb_ltdb_header *header,
57                                   TDB_DATA data)
58 {
59         struct ctdb_context *ctdb = ctdb_db->ctdb;
60         TDB_DATA rec[2];
61         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
62         int ret;
63         bool seqnum_suppressed = false;
64         bool keep = false;
65         bool schedule_for_deletion = false;
66         bool remove_from_delete_queue = false;
67         uint32_t lmaster;
68
69         if (ctdb->flags & CTDB_FLAG_TORTURE) {
70                 TDB_DATA old;
71                 struct ctdb_ltdb_header *h2;
72
73                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74                 h2 = (struct ctdb_ltdb_header *)old.dptr;
75                 if (old.dptr != NULL &&
76                     old.dsize >= hsize &&
77                     h2->rsn > header->rsn) {
78                         DEBUG(DEBUG_ERR,
79                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
80                                h2->rsn, header->rsn));
81                 }
82                 if (old.dptr) {
83                         free(old.dptr);
84                 }
85         }
86
87         if (ctdb->vnn_map == NULL) {
88                 /*
89                  * Called from a client: always store the record
90                  * Also don't call ctdb_lmaster since it uses the vnn_map!
91                  */
92                 keep = true;
93                 goto store;
94         }
95
96         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97
98         /*
99          * If we migrate an empty record off to another node
100          * and the record has not been migrated with data,
101          * delete the record instead of storing the empty record.
102          */
103         if (data.dsize != 0) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_RO_FLAGS) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
108                 /*
109                  * The record is not created by the client but
110                  * automatically by the ctdb_ltdb_fetch logic that
111                  * creates a record with an initial header in the
112                  * ltdb before trying to migrate the record from
113                  * the current lmaster. Keep it instead of trying
114                  * to delete the non-existing record...
115                  */
116                 keep = true;
117                 schedule_for_deletion = true;
118         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
119                 keep = true;
120         } else if (ctdb_db->ctdb->pnn == lmaster) {
121                 /*
122                  * If we are lmaster, then we usually keep the record.
123                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124                  * and the record is empty and has never been migrated
125                  * with data, then we should delete it instead of storing it.
126                  * This is part of the vacuuming process.
127                  *
128                  * The reason that we usually need to store even empty records
129                  * on the lmaster is that a client operating directly on the
130                  * lmaster (== dmaster) expects the local copy of the record to
131                  * exist after successful ctdb migrate call. If the record does
132                  * not exist, the client goes into a migrate loop and eventually
133                  * fails. So storing the empty record makes sure that we do not
134                  * need to change the client code.
135                  */
136                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
137                         keep = true;
138                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139                         keep = true;
140                 }
141         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
142                 keep = true;
143         }
144
145         if (keep) {
146                 if (ctdb_db_volatile(ctdb_db) &&
147                     (ctdb_db->ctdb->pnn == header->dmaster) &&
148                     !(header->flags & CTDB_REC_RO_FLAGS))
149                 {
150                         header->rsn++;
151
152                         if (data.dsize == 0) {
153                                 schedule_for_deletion = true;
154                         }
155                 }
156                 remove_from_delete_queue = !schedule_for_deletion;
157         }
158
159 store:
160         /*
161          * The VACUUM_MIGRATED flag is only set temporarily for
162          * the above logic when the record was retrieved by a
163          * VACUUM_MIGRATE call and should not be stored in the
164          * database.
165          *
166          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167          * and there are two cases in which the corresponding record
168          * is stored in the local database:
169          * 1. The record has been migrated with data in the past
170          *    (the MIGRATED_WITH_DATA record flag is set).
171          * 2. The record has been filled with data again since it
172          *    had been submitted in the VACUUM_FETCH message to the
173          *    lmaster.
174          * For such records it is important to not store the
175          * VACUUM_MIGRATED flag in the database.
176          */
177         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
178
179         /*
180          * Similarly, clear the AUTOMATIC flag which should not enter
181          * the local database copy since this would require client
182          * modifications to clear the flag when the client stores
183          * the record.
184          */
185         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
186
187         rec[0].dsize = hsize;
188         rec[0].dptr = (uint8_t *)header;
189
190         rec[1].dsize = data.dsize;
191         rec[1].dptr = data.dptr;
192
193         /* Databases with seqnum updates enabled only get their seqnum
194            changes when/if we modify the data */
195         if (ctdb_db->seqnum_update != NULL) {
196                 TDB_DATA old;
197                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
198
199                 if ((old.dsize == hsize + data.dsize) &&
200                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202                         seqnum_suppressed = true;
203                 }
204                 if (old.dptr != NULL) {
205                         free(old.dptr);
206                 }
207         }
208
209         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
210                             ctdb_db->db_name,
211                             keep?"storing":"deleting",
212                             ctdb_hash(&key)));
213
214         if (keep) {
215                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
216         } else {
217                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
218         }
219
220         if (ret != 0) {
221                 int lvl = DEBUG_ERR;
222
223                 if (keep == false &&
224                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
225                 {
226                         lvl = DEBUG_DEBUG;
227                 }
228
229                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
230                             "%d - %s\n",
231                             ctdb_db->db_name,
232                             keep?"store":"delete", ret,
233                             tdb_errorstr(ctdb_db->ltdb->tdb)));
234
235                 schedule_for_deletion = false;
236                 remove_from_delete_queue = false;
237         }
238         if (seqnum_suppressed) {
239                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
240         }
241
242         if (schedule_for_deletion) {
243                 int ret2;
244                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
245                 if (ret2 != 0) {
246                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247                 }
248         }
249
250         if (remove_from_delete_queue) {
251                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
252         }
253
254         return ret;
255 }
256
257 struct lock_fetch_state {
258         struct ctdb_context *ctdb;
259         struct ctdb_db_context *ctdb_db;
260         void (*recv_pkt)(void *, struct ctdb_req_header *);
261         void *recv_context;
262         struct ctdb_req_header *hdr;
263         uint32_t generation;
264         bool ignore_generation;
265 };
266
267 /*
268   called when we should retry the operation
269  */
270 static void lock_fetch_callback(void *p, bool locked)
271 {
272         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273         if (!state->ignore_generation &&
274             state->generation != state->ctdb_db->generation) {
275                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276                 talloc_free(state->hdr);
277                 return;
278         }
279         state->recv_pkt(state->recv_context, state->hdr);
280         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
281 }
282
283
284 /*
285   do a non-blocking ltdb_lock, deferring this ctdb request until we
286   have the chainlock
287
288   It does the following:
289
290    1) tries to get the chainlock. If it succeeds, then it returns 0
291
292    2) if it fails to get a chainlock immediately then it sets up a
293    non-blocking chainlock via ctdb_lock_record, and when it gets the
294    chainlock it re-submits this ctdb request to the main packet
295    receive function.
296
297    This effectively queues all ctdb requests that cannot be
298    immediately satisfied until it can get the lock. This means that
299    the main ctdb daemon will not block waiting for a chainlock held by
300    a client
301
302    There are 3 possible return values:
303
304        0:    means that it got the lock immediately.
305       -1:    means that it failed to get the lock, and won't retry
306       -2:    means that it failed to get the lock immediately, but will retry
307  */
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
309                            TDB_DATA key, struct ctdb_req_header *hdr,
310                            void (*recv_pkt)(void *, struct ctdb_req_header *),
311                            void *recv_context, bool ignore_generation)
312 {
313         int ret;
314         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315         struct lock_request *lreq;
316         struct lock_fetch_state *state;
317         
318         ret = tdb_chainlock_nonblock(tdb, key);
319
320         if (ret != 0 &&
321             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322                 /* a hard failure - don't try again */
323                 return -1;
324         }
325
326         /* when torturing, ensure we test the contended path */
327         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
328             random() % 5 == 0) {
329                 ret = -1;
330                 tdb_chainunlock(tdb, key);
331         }
332
333         /* first the non-contended path */
334         if (ret == 0) {
335                 return 0;
336         }
337
338         state = talloc(hdr, struct lock_fetch_state);
339         state->ctdb = ctdb_db->ctdb;
340         state->ctdb_db = ctdb_db;
341         state->hdr = hdr;
342         state->recv_pkt = recv_pkt;
343         state->recv_context = recv_context;
344         state->generation = ctdb_db->generation;
345         state->ignore_generation = ignore_generation;
346
347         /* now the contended path */
348         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
349         if (lreq == NULL) {
350                 return -1;
351         }
352
353         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354            so it won't be freed yet */
355         talloc_steal(state, hdr);
356
357         /* now tell the caller than we will retry asynchronously */
358         return -2;
359 }
360
361 /*
362   a varient of ctdb_ltdb_lock_requeue that also fetches the record
363  */
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
365                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
366                                  struct ctdb_req_header *hdr, TDB_DATA *data,
367                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
368                                  void *recv_context, bool ignore_generation)
369 {
370         int ret;
371
372         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
373                                      recv_context, ignore_generation);
374         if (ret == 0) {
375                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
376                 if (ret != 0) {
377                         int uret;
378                         uret = ctdb_ltdb_unlock(ctdb_db, key);
379                         if (uret != 0) {
380                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
381                         }
382                 }
383         }
384         return ret;
385 }
386
387
388 /*
389   paraoid check to see if the db is empty
390  */
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
392 {
393         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394         int count = tdb_traverse_read(tdb, NULL, NULL);
395         if (count != 0) {
396                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
397                          ctdb_db->db_path));
398                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
399         }
400 }
401
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403                                 struct ctdb_db_context *ctdb_db)
404 {
405         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
406         char *old;
407         char *reason = NULL;
408         TDB_DATA key;
409         TDB_DATA val;
410
411         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412         key.dsize = strlen(ctdb_db->db_name);
413
414         old = ctdb_db->unhealthy_reason;
415         ctdb_db->unhealthy_reason = NULL;
416
417         val = tdb_fetch(tdb, key);
418         if (val.dsize > 0) {
419                 reason = talloc_strndup(ctdb_db,
420                                         (const char *)val.dptr,
421                                         val.dsize);
422                 if (reason == NULL) {
423                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
424                                            (int)val.dsize));
425                         ctdb_db->unhealthy_reason = old;
426                         free(val.dptr);
427                         return -1;
428                 }
429         }
430
431         if (val.dptr) {
432                 free(val.dptr);
433         }
434
435         talloc_free(old);
436         ctdb_db->unhealthy_reason = reason;
437         return 0;
438 }
439
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441                                   struct ctdb_db_context *ctdb_db,
442                                   const char *given_reason,/* NULL means healthy */
443                                   int num_healthy_nodes)
444 {
445         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
446         int ret;
447         TDB_DATA key;
448         TDB_DATA val;
449         char *new_reason = NULL;
450         char *old_reason = NULL;
451
452         ret = tdb_transaction_start(tdb);
453         if (ret != 0) {
454                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
456                 return -1;
457         }
458
459         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
460         if (ret != 0) {
461                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462                                    ctdb_db->db_name, ret));
463                 return -1;
464         }
465         old_reason = ctdb_db->unhealthy_reason;
466
467         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468         key.dsize = strlen(ctdb_db->db_name);
469
470         if (given_reason) {
471                 new_reason = talloc_strdup(ctdb_db, given_reason);
472                 if (new_reason == NULL) {
473                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
474                                           given_reason));
475                         return -1;
476                 }
477         } else if (old_reason && num_healthy_nodes == 0) {
478                 /*
479                  * If the reason indicates ok, but there where no healthy nodes
480                  * available, that it means, we have not recovered valid content
481                  * of the db. So if there's an old reason, prefix it with
482                  * "NO-HEALTHY-NODES - "
483                  */
484                 const char *prefix;
485
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
488                 if (ret != 0) {
489                         prefix = _TMP_PREFIX;
490                 } else {
491                         prefix = "";
492                 }
493                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
494                                          prefix, old_reason);
495                 if (new_reason == NULL) {
496                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497                                           prefix, old_reason));
498                         return -1;
499                 }
500 #undef _TMP_PREFIX
501         }
502
503         if (new_reason) {
504                 val.dptr = discard_const_p(uint8_t, new_reason);
505                 val.dsize = strlen(new_reason);
506
507                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
508                 if (ret != 0) {
509                         tdb_transaction_cancel(tdb);
510                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
512                                            ret, tdb_errorstr(tdb)));
513                         talloc_free(new_reason);
514                         return -1;
515                 }
516                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517                                    ctdb_db->db_name, new_reason));
518         } else if (old_reason) {
519                 ret = tdb_delete(tdb, key);
520                 if (ret != 0) {
521                         tdb_transaction_cancel(tdb);
522                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523                                            tdb_name(tdb), ctdb_db->db_name,
524                                            ret, tdb_errorstr(tdb)));
525                         talloc_free(new_reason);
526                         return -1;
527                 }
528                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
529                                    ctdb_db->db_name));
530         }
531
532         ret = tdb_transaction_commit(tdb);
533         if (ret != TDB_SUCCESS) {
534                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
536                 talloc_free(new_reason);
537                 return -1;
538         }
539
540         talloc_free(old_reason);
541         ctdb_db->unhealthy_reason = new_reason;
542
543         return 0;
544 }
545
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547                                      struct ctdb_db_context *ctdb_db)
548 {
549         time_t now = time(NULL);
550         char *new_path;
551         char *new_reason;
552         int ret;
553         struct tm *tm;
554
555         tm = gmtime(&now);
556
557         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559                                    "%04u%02u%02u%02u%02u%02u.0Z",
560                                    ctdb_db->db_path,
561                                    tm->tm_year+1900, tm->tm_mon+1,
562                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
563                                    tm->tm_sec);
564         if (new_path == NULL) {
565                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
566                 return -1;
567         }
568
569         new_reason = talloc_asprintf(ctdb_db,
570                                      "ERROR - Backup of corrupted TDB in '%s'",
571                                      new_path);
572         if (new_reason == NULL) {
573                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
574                 return -1;
575         }
576         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577         talloc_free(new_reason);
578         if (ret != 0) {
579                 DEBUG(DEBUG_CRIT,(__location__
580                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
581                                  ctdb_db->db_path));
582                 return -1;
583         }
584
585         ret = rename(ctdb_db->db_path, new_path);
586         if (ret != 0) {
587                 DEBUG(DEBUG_CRIT,(__location__
588                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589                                   ctdb_db->db_path, new_path,
590                                   errno, strerror(errno)));
591                 talloc_free(new_path);
592                 return -1;
593         }
594
595         DEBUG(DEBUG_CRIT,(__location__
596                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597                          ctdb_db->db_path, new_path));
598         talloc_free(new_path);
599         return 0;
600 }
601
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
603 {
604         struct ctdb_db_context *ctdb_db;
605         int ret;
606         int ok = 0;
607         int fail = 0;
608
609         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610                 if (!ctdb_db_persistent(ctdb_db)) {
611                         continue;
612                 }
613
614                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
615                 if (ret != 0) {
616                         DEBUG(DEBUG_ALERT,(__location__
617                                            " load persistent health for '%s' failed\n",
618                                            ctdb_db->db_path));
619                         return -1;
620                 }
621
622                 if (ctdb_db->unhealthy_reason == NULL) {
623                         ok++;
624                         DEBUG(DEBUG_INFO,(__location__
625                                    " persistent db '%s' healthy\n",
626                                    ctdb_db->db_path));
627                         continue;
628                 }
629
630                 fail++;
631                 DEBUG(DEBUG_ALERT,(__location__
632                                    " persistent db '%s' unhealthy: %s\n",
633                                    ctdb_db->db_path,
634                                    ctdb_db->unhealthy_reason));
635         }
636         DEBUG(DEBUG_NOTICE,
637               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
638                ok, fail));
639
640         if (fail != 0) {
641                 return -1;
642         }
643
644         return 0;
645 }
646
647
648 /*
649   mark a database - as healthy
650  */
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
652 {
653         uint32_t db_id = *(uint32_t *)indata.dptr;
654         struct ctdb_db_context *ctdb_db;
655         int ret;
656         bool may_recover = false;
657
658         ctdb_db = find_ctdb_db(ctdb, db_id);
659         if (!ctdb_db) {
660                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
661                 return -1;
662         }
663
664         if (ctdb_db->unhealthy_reason) {
665                 may_recover = true;
666         }
667
668         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR,(__location__
671                                  " ctdb_update_persistent_health(%s) failed\n",
672                                  ctdb_db->db_name));
673                 return -1;
674         }
675
676         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
678                                   ctdb_db->db_name));
679                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
680         }
681
682         return 0;
683 }
684
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
686                                    TDB_DATA indata,
687                                    TDB_DATA *outdata)
688 {
689         uint32_t db_id = *(uint32_t *)indata.dptr;
690         struct ctdb_db_context *ctdb_db;
691         int ret;
692
693         ctdb_db = find_ctdb_db(ctdb, db_id);
694         if (!ctdb_db) {
695                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
696                 return -1;
697         }
698
699         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
700         if (ret != 0) {
701                 DEBUG(DEBUG_ERR,(__location__
702                                  " ctdb_load_persistent_health(%s) failed\n",
703                                  ctdb_db->db_name));
704                 return -1;
705         }
706
707         *outdata = tdb_null;
708         if (ctdb_db->unhealthy_reason) {
709                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
711         }
712
713         return 0;
714 }
715
716
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
718 {
719         char *ropath;
720
721         if (ctdb_db_readonly(ctdb_db)) {
722                 return 0;
723         }
724
725         if (! ctdb_db_volatile(ctdb_db)) {
726                 DEBUG(DEBUG_ERR,
727                       ("Non-volatile databases do not support readonly flag\n"));
728                 return -1;
729         }
730
731         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
732         if (ropath == NULL) {
733                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
734                 return -1;
735         }
736         ctdb_db->rottdb = tdb_open(ropath, 
737                               ctdb->tunable.database_hash_size, 
738                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
739                               O_CREAT|O_RDWR, 0600);
740         if (ctdb_db->rottdb == NULL) {
741                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
742                 talloc_free(ropath);
743                 return -1;
744         }
745
746         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
747
748         ctdb_db_set_readonly(ctdb_db);
749
750         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
751
752         talloc_free(ropath);
753         return 0;
754 }
755
756 /*
757   attach to a database, handling both persistent and non-persistent databases
758   return 0 on success, -1 on failure
759  */
760 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
761                              uint8_t db_flags, const char *unhealthy_reason)
762 {
763         struct ctdb_db_context *ctdb_db, *tmp_db;
764         int ret;
765         struct TDB_DATA key;
766         int tdb_flags;
767         int mode = 0600;
768         int remaining_tries = 0;
769
770         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771         CTDB_NO_MEMORY(ctdb, ctdb_db);
772
773         ctdb_db->ctdb = ctdb;
774         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
776
777         key.dsize = strlen(db_name)+1;
778         key.dptr  = discard_const(db_name);
779         ctdb_db->db_id = ctdb_hash(&key);
780         ctdb_db->db_flags = db_flags;
781
782         if (ctdb_db_volatile(ctdb_db)) {
783                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784                 if (ctdb_db->delete_queue == NULL) {
785                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
786                 }
787
788                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
789         }
790
791         /* check for hash collisions */
792         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793                 if (tmp_db->db_id == ctdb_db->db_id) {
794                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795                                  tmp_db->db_id, db_name, tmp_db->db_name));
796                         talloc_free(ctdb_db);
797                         return -1;
798                 }
799         }
800
801         if (ctdb_db_persistent(ctdb_db)) {
802                 if (unhealthy_reason) {
803                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804                                                             unhealthy_reason, 0);
805                         if (ret != 0) {
806                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807                                                    ctdb_db->db_name, unhealthy_reason, ret));
808                                 talloc_free(ctdb_db);
809                                 return -1;
810                         }
811                 }
812
813                 if (ctdb->max_persistent_check_errors > 0) {
814                         remaining_tries = 1;
815                 }
816                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
817                         remaining_tries = 0;
818                 }
819
820                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
821                 if (ret != 0) {
822                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823                                    ctdb_db->db_name, ret));
824                         talloc_free(ctdb_db);
825                         return -1;
826                 }
827         }
828
829         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
832                 talloc_free(ctdb_db);
833                 return -1;
834         }
835
836         if (ctdb_db->unhealthy_reason) {
837                 /* this is just a warning, but we want that in the log file! */
838                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
840         }
841
842         /* open the database */
843         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
844                                            ctdb_db_persistent(ctdb_db) ?
845                                                 ctdb->db_directory_persistent :
846                                                 ctdb->db_directory,
847                                            db_name, ctdb->pnn);
848
849         tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
850                                       ctdb->tunable.mutex_enabled);
851
852 again:
853         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
854                                       ctdb->tunable.database_hash_size, 
855                                       tdb_flags, 
856                                       O_CREAT|O_RDWR, mode);
857         if (ctdb_db->ltdb == NULL) {
858                 struct stat st;
859                 int saved_errno = errno;
860
861                 if (! ctdb_db_persistent(ctdb_db)) {
862                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
863                                           ctdb_db->db_path,
864                                           saved_errno,
865                                           strerror(saved_errno)));
866                         talloc_free(ctdb_db);
867                         return -1;
868                 }
869
870                 if (remaining_tries == 0) {
871                         DEBUG(DEBUG_CRIT,(__location__
872                                           "Failed to open persistent tdb '%s': %d - %s\n",
873                                           ctdb_db->db_path,
874                                           saved_errno,
875                                           strerror(saved_errno)));
876                         talloc_free(ctdb_db);
877                         return -1;
878                 }
879
880                 ret = stat(ctdb_db->db_path, &st);
881                 if (ret != 0) {
882                         DEBUG(DEBUG_CRIT,(__location__
883                                           "Failed to open persistent tdb '%s': %d - %s\n",
884                                           ctdb_db->db_path,
885                                           saved_errno,
886                                           strerror(saved_errno)));
887                         talloc_free(ctdb_db);
888                         return -1;
889                 }
890
891                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
892                 if (ret != 0) {
893                         DEBUG(DEBUG_CRIT,(__location__
894                                           "Failed to open persistent tdb '%s': %d - %s\n",
895                                           ctdb_db->db_path,
896                                           saved_errno,
897                                           strerror(saved_errno)));
898                         talloc_free(ctdb_db);
899                         return -1;
900                 }
901
902                 remaining_tries--;
903                 mode = st.st_mode;
904                 goto again;
905         }
906
907         if (!ctdb_db_persistent(ctdb_db)) {
908                 ctdb_check_db_empty(ctdb_db);
909         } else {
910                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
911                 if (ret != 0) {
912                         int fd;
913                         struct stat st;
914
915                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
916                                           ctdb_db->db_path, ret,
917                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
918                         if (remaining_tries == 0) {
919                                 talloc_free(ctdb_db);
920                                 return -1;
921                         }
922
923                         fd = tdb_fd(ctdb_db->ltdb->tdb);
924                         ret = fstat(fd, &st);
925                         if (ret != 0) {
926                                 DEBUG(DEBUG_CRIT,(__location__
927                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
928                                                   ctdb_db->db_path,
929                                                   errno,
930                                                   strerror(errno)));
931                                 talloc_free(ctdb_db);
932                                 return -1;
933                         }
934
935                         /* close the TDB */
936                         talloc_free(ctdb_db->ltdb);
937                         ctdb_db->ltdb = NULL;
938
939                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
940                         if (ret != 0) {
941                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
942                                                   ctdb_db->db_path));
943                                 talloc_free(ctdb_db);
944                                 return -1;
945                         }
946
947                         remaining_tries--;
948                         mode = st.st_mode;
949                         goto again;
950                 }
951         }
952
953         /* remember the flags the client has specified */
954         tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
955
956
957         /* set up a rb tree we can use to track which records we have a 
958            fetch-lock in-flight for so we can defer any additional calls
959            for the same record.
960          */
961         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
962         if (ctdb_db->deferred_fetch == NULL) {
963                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
964                 talloc_free(ctdb_db);
965                 return -1;
966         }
967
968         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
969         if (ctdb_db->defer_dmaster == NULL) {
970                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
971                                   ctdb_db->db_name));
972                 talloc_free(ctdb_db);
973                 return -1;
974         }
975
976         DLIST_ADD(ctdb->db_list, ctdb_db);
977
978         /* setting this can help some high churn databases */
979         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
980
981         /* 
982            all databases support the "null" function. we need this in
983            order to do forced migration of records
984         */
985         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
986         if (ret != 0) {
987                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
988                 talloc_free(ctdb_db);
989                 return -1;
990         }
991
992         /* 
993            all databases support the "fetch" function. we need this
994            for efficient Samba3 ctdb fetch
995         */
996         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
997         if (ret != 0) {
998                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
999                 talloc_free(ctdb_db);
1000                 return -1;
1001         }
1002
1003         /* 
1004            all databases support the "fetch_with_header" function. we need this
1005            for efficient readonly record fetches
1006         */
1007         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1008         if (ret != 0) {
1009                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1010                 talloc_free(ctdb_db);
1011                 return -1;
1012         }
1013
1014         ret = ctdb_vacuum_init(ctdb_db);
1015         if (ret != 0) {
1016                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1017                                   "database '%s'\n", ctdb_db->db_name));
1018                 talloc_free(ctdb_db);
1019                 return -1;
1020         }
1021
1022         ret = ctdb_migration_init(ctdb_db);
1023         if (ret != 0) {
1024                 DEBUG(DEBUG_ERR,
1025                       ("Failed to setup migration tracking for db '%s'\n",
1026                        ctdb_db->db_name));
1027                 talloc_free(ctdb_db);
1028                 return -1;
1029         }
1030
1031         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1032                            &ctdb_db->lock_log);
1033         if (ret != 0) {
1034                 DEBUG(DEBUG_ERR,
1035                       ("Failed to setup lock logging for db '%s'\n",
1036                        ctdb_db->db_name));
1037                 talloc_free(ctdb_db);
1038                 return -1;
1039         }
1040
1041         ctdb_db->generation = ctdb->vnn_map->generation;
1042
1043         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1044                             ctdb_db->db_path, tdb_flags));
1045
1046         /* success */
1047         return 0;
1048 }
1049
1050
1051 struct ctdb_deferred_attach_context {
1052         struct ctdb_deferred_attach_context *next, *prev;
1053         struct ctdb_context *ctdb;
1054         struct ctdb_req_control_old *c;
1055 };
1056
1057
1058 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1059 {
1060         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1061
1062         return 0;
1063 }
1064
1065 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1066                                          struct tevent_timer *te,
1067                                          struct timeval t, void *private_data)
1068 {
1069         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1070         struct ctdb_context *ctdb = da_ctx->ctdb;
1071
1072         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1073         talloc_free(da_ctx);
1074 }
1075
1076 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1077                                           struct tevent_timer *te,
1078                                           struct timeval t, void *private_data)
1079 {
1080         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1081         struct ctdb_context *ctdb = da_ctx->ctdb;
1082
1083         /* This talloc-steals the packet ->c */
1084         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1085         talloc_free(da_ctx);
1086 }
1087
1088 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1089 {
1090         struct ctdb_deferred_attach_context *da_ctx;
1091
1092         /* call it from the main event loop as soon as the current event 
1093            finishes.
1094          */
1095         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1096                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1097                 tevent_add_timer(ctdb->ev, da_ctx,
1098                                  timeval_current_ofs(1,0),
1099                                  ctdb_deferred_attach_callback, da_ctx);
1100         }
1101
1102         return 0;
1103 }
1104
1105 /*
1106   a client has asked to attach a new database
1107  */
1108 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1109                                TDB_DATA *outdata,
1110                                uint8_t db_flags, uint32_t client_id,
1111                                struct ctdb_req_control_old *c,
1112                                bool *async_reply)
1113 {
1114         const char *db_name = (const char *)indata.dptr;
1115         struct ctdb_db_context *db;
1116         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1117         struct ctdb_client *client = NULL;
1118         uint32_t opcode;
1119
1120         if (ctdb->tunable.allow_client_db_attach == 0) {
1121                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1122                                   "AllowClientDBAccess == 0\n", db_name));
1123                 return -1;
1124         }
1125
1126         /* don't allow any local clients to attach while we are in recovery mode
1127          * except for the recovery daemon.
1128          * allow all attach from the network since these are always from remote
1129          * recovery daemons.
1130          */
1131         if (client_id != 0) {
1132                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1133         }
1134         if (client != NULL) {
1135                 /* If the node is inactive it is not part of the cluster
1136                    and we should not allow clients to attach to any
1137                    databases
1138                 */
1139                 if (node->flags & NODE_FLAGS_INACTIVE) {
1140                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1141                         return -1;
1142                 }
1143
1144                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1145                     client->pid != ctdb->recoverd_pid &&
1146                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1147                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1148
1149                         if (da_ctx == NULL) {
1150                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1151                                 return -1;
1152                         }
1153
1154                         da_ctx->ctdb = ctdb;
1155                         da_ctx->c = talloc_steal(da_ctx, c);
1156                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1157                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1158
1159                         tevent_add_timer(ctdb->ev, da_ctx,
1160                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1161                                          ctdb_deferred_attach_timeout, da_ctx);
1162
1163                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1164                         *async_reply = true;
1165                         return 0;
1166                 }
1167         }
1168
1169         /* see if we already have this name */
1170         db = ctdb_db_handle(ctdb, db_name);
1171         if (db) {
1172                 if ((db->db_flags & db_flags) != db_flags) {
1173                         DEBUG(DEBUG_ERR,
1174                               ("Error: Failed to re-attach with 0x%x flags,"
1175                                " database has 0x%x flags\n", db_flags,
1176                                db->db_flags));
1177                         return -1;
1178                 }
1179                 outdata->dptr  = (uint8_t *)&db->db_id;
1180                 outdata->dsize = sizeof(db->db_id);
1181                 return 0;
1182         }
1183
1184         if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1185                 return -1;
1186         }
1187
1188         db = ctdb_db_handle(ctdb, db_name);
1189         if (!db) {
1190                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1191                 return -1;
1192         }
1193
1194         outdata->dptr  = (uint8_t *)&db->db_id;
1195         outdata->dsize = sizeof(db->db_id);
1196
1197         /* Try to ensure it's locked in mem */
1198         lockdown_memory(ctdb->valgrinding);
1199
1200         if (ctdb_db_persistent(db)) {
1201                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1202         } else if (ctdb_db_replicated(db)) {
1203                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1204         } else {
1205                 opcode = CTDB_CONTROL_DB_ATTACH;
1206         }
1207
1208         /* tell all the other nodes about this database */
1209         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1210                                  0, CTDB_CTRL_FLAG_NOREPLY,
1211                                  indata, NULL, NULL);
1212
1213         /* success */
1214         return 0;
1215 }
1216
1217 /*
1218  * a client has asked to detach from a database
1219  */
1220 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1221                                uint32_t client_id)
1222 {
1223         uint32_t db_id;
1224         struct ctdb_db_context *ctdb_db;
1225         struct ctdb_client *client = NULL;
1226
1227         db_id = *(uint32_t *)indata.dptr;
1228         ctdb_db = find_ctdb_db(ctdb, db_id);
1229         if (ctdb_db == NULL) {
1230                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1231                                   db_id));
1232                 return -1;
1233         }
1234
1235         if (ctdb->tunable.allow_client_db_attach == 1) {
1236                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1237                                   "Clients are allowed access to databases "
1238                                   "(AllowClientDBAccess == 1)\n",
1239                                   ctdb_db->db_name));
1240                 return -1;
1241         }
1242
1243         if (! ctdb_db_volatile(ctdb_db)) {
1244                 DEBUG(DEBUG_ERR,
1245                       ("Detaching non-volatile database %s denied\n",
1246                        ctdb_db->db_name));
1247                 return -1;
1248         }
1249
1250         /* Cannot detach from database when in recovery */
1251         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1252                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1253                 return -1;
1254         }
1255
1256         /* If a control comes from a client, then broadcast it to all nodes.
1257          * Do the actual detach only if the control comes from other daemons.
1258          */
1259         if (client_id != 0) {
1260                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1261                 if (client != NULL) {
1262                         /* forward the control to all the nodes */
1263                         ctdb_daemon_send_control(ctdb,
1264                                                  CTDB_BROADCAST_CONNECTED, 0,
1265                                                  CTDB_CONTROL_DB_DETACH, 0,
1266                                                  CTDB_CTRL_FLAG_NOREPLY,
1267                                                  indata, NULL, NULL);
1268                         return 0;
1269                 }
1270                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1271                                   "for database '%s'\n", ctdb_db->db_name));
1272                 return -1;
1273         }
1274
1275         /* Detach database from recoverd */
1276         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1277                                      CTDB_SRVID_DETACH_DATABASE,
1278                                      indata) != 0) {
1279                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1280                 return -1;
1281         }
1282
1283         /* Disable vacuuming and drop all vacuuming data */
1284         talloc_free(ctdb_db->vacuum_handle);
1285         talloc_free(ctdb_db->delete_queue);
1286
1287         /* Terminate any deferred fetch */
1288         talloc_free(ctdb_db->deferred_fetch);
1289
1290         /* Terminate any traverses */
1291         while (ctdb_db->traverse) {
1292                 talloc_free(ctdb_db->traverse);
1293         }
1294
1295         /* Terminate any revokes */
1296         while (ctdb_db->revokechild_active) {
1297                 talloc_free(ctdb_db->revokechild_active);
1298         }
1299
1300         /* Free readonly tracking database */
1301         if (ctdb_db_readonly(ctdb_db)) {
1302                 talloc_free(ctdb_db->rottdb);
1303         }
1304
1305         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1306
1307         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1308                              ctdb_db->db_name));
1309         talloc_free(ctdb_db);
1310
1311         return 0;
1312 }
1313
1314 /*
1315   attach to all existing persistent databases
1316  */
1317 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1318                                   const char *unhealthy_reason)
1319 {
1320         DIR *d;
1321         struct dirent *de;
1322
1323         /* open the persistent db directory and scan it for files */
1324         d = opendir(ctdb->db_directory_persistent);
1325         if (d == NULL) {
1326                 return 0;
1327         }
1328
1329         while ((de=readdir(d))) {
1330                 char *p, *s, *q;
1331                 size_t len = strlen(de->d_name);
1332                 uint32_t node;
1333                 int invalid_name = 0;
1334                 
1335                 s = talloc_strdup(ctdb, de->d_name);
1336                 if (s == NULL) {
1337                         closedir(d);
1338                         CTDB_NO_MEMORY(ctdb, s);
1339                 }
1340
1341                 /* only accept names ending in .tdb */
1342                 p = strstr(s, ".tdb.");
1343                 if (len < 7 || p == NULL) {
1344                         talloc_free(s);
1345                         continue;
1346                 }
1347
1348                 /* only accept names ending with .tdb. and any number of digits */
1349                 q = p+5;
1350                 while (*q != 0 && invalid_name == 0) {
1351                         if (!isdigit(*q++)) {
1352                                 invalid_name = 1;
1353                         }
1354                 }
1355                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1356                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1357                         talloc_free(s);
1358                         continue;
1359                 }
1360                 p[4] = 0;
1361
1362                 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1363                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1364                         closedir(d);
1365                         talloc_free(s);
1366                         return -1;
1367                 }
1368
1369                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1370
1371                 talloc_free(s);
1372         }
1373         closedir(d);
1374         return 0;
1375 }
1376
1377 int ctdb_attach_databases(struct ctdb_context *ctdb)
1378 {
1379         int ret;
1380         char *persistent_health_path = NULL;
1381         char *unhealthy_reason = NULL;
1382         bool first_try = true;
1383
1384         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1385                                                  ctdb->db_directory_state,
1386                                                  PERSISTENT_HEALTH_TDB,
1387                                                  ctdb->pnn);
1388         if (persistent_health_path == NULL) {
1389                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1390                 return -1;
1391         }
1392
1393 again:
1394
1395         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1396                                                    0, TDB_DISALLOW_NESTING,
1397                                                    O_CREAT | O_RDWR, 0600);
1398         if (ctdb->db_persistent_health == NULL) {
1399                 struct tdb_wrap *tdb;
1400
1401                 if (!first_try) {
1402                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1403                                           persistent_health_path,
1404                                           errno,
1405                                           strerror(errno)));
1406                         talloc_free(persistent_health_path);
1407                         talloc_free(unhealthy_reason);
1408                         return -1;
1409                 }
1410                 first_try = false;
1411
1412                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1413                                                    persistent_health_path,
1414                                                    "was cleared after a failure",
1415                                                    "manual verification needed");
1416                 if (unhealthy_reason == NULL) {
1417                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1418                         talloc_free(persistent_health_path);
1419                         return -1;
1420                 }
1421
1422                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1423                                   persistent_health_path));
1424                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1425                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1426                                     O_CREAT | O_RDWR, 0600);
1427                 if (tdb) {
1428                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1429                                           persistent_health_path,
1430                                           errno,
1431                                           strerror(errno)));
1432                         talloc_free(persistent_health_path);
1433                         talloc_free(unhealthy_reason);
1434                         return -1;
1435                 }
1436
1437                 talloc_free(tdb);
1438                 goto again;
1439         }
1440         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1441         if (ret != 0) {
1442                 struct tdb_wrap *tdb;
1443
1444                 talloc_free(ctdb->db_persistent_health);
1445                 ctdb->db_persistent_health = NULL;
1446
1447                 if (!first_try) {
1448                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1449                                           persistent_health_path));
1450                         talloc_free(persistent_health_path);
1451                         talloc_free(unhealthy_reason);
1452                         return -1;
1453                 }
1454                 first_try = false;
1455
1456                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1457                                                    persistent_health_path,
1458                                                    "was cleared after a failure",
1459                                                    "manual verification needed");
1460                 if (unhealthy_reason == NULL) {
1461                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1462                         talloc_free(persistent_health_path);
1463                         return -1;
1464                 }
1465
1466                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1467                                   persistent_health_path));
1468                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1469                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1470                                     O_CREAT | O_RDWR, 0600);
1471                 if (tdb) {
1472                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1473                                           persistent_health_path,
1474                                           errno,
1475                                           strerror(errno)));
1476                         talloc_free(persistent_health_path);
1477                         talloc_free(unhealthy_reason);
1478                         return -1;
1479                 }
1480
1481                 talloc_free(tdb);
1482                 goto again;
1483         }
1484         talloc_free(persistent_health_path);
1485
1486         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1487         talloc_free(unhealthy_reason);
1488         if (ret != 0) {
1489                 return ret;
1490         }
1491
1492         return 0;
1493 }
1494
1495 /*
1496   called when a broadcast seqnum update comes in
1497  */
1498 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1499 {
1500         struct ctdb_db_context *ctdb_db;
1501         if (srcnode == ctdb->pnn) {
1502                 /* don't update ourselves! */
1503                 return 0;
1504         }
1505
1506         ctdb_db = find_ctdb_db(ctdb, db_id);
1507         if (!ctdb_db) {
1508                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1509                 return -1;
1510         }
1511
1512         if (ctdb_db->unhealthy_reason) {
1513                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1514                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1515                 return -1;
1516         }
1517
1518         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1519         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1520         return 0;
1521 }
1522
1523 /*
1524   timer to check for seqnum changes in a ltdb and propogate them
1525  */
1526 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1527                                    struct tevent_timer *te,
1528                                    struct timeval t, void *p)
1529 {
1530         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1531         struct ctdb_context *ctdb = ctdb_db->ctdb;
1532         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1533         if (new_seqnum != ctdb_db->seqnum) {
1534                 /* something has changed - propogate it */
1535                 TDB_DATA data;
1536                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1537                 data.dsize = sizeof(uint32_t);
1538                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1539                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1540                                          data, NULL, NULL);             
1541         }
1542         ctdb_db->seqnum = new_seqnum;
1543
1544         /* setup a new timer */
1545         ctdb_db->seqnum_update =
1546                 tevent_add_timer(ctdb->ev, ctdb_db,
1547                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1548                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1549                                  ctdb_ltdb_seqnum_check, ctdb_db);
1550 }
1551
1552 /*
1553   enable seqnum handling on this db
1554  */
1555 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1556 {
1557         struct ctdb_db_context *ctdb_db;
1558         ctdb_db = find_ctdb_db(ctdb, db_id);
1559         if (!ctdb_db) {
1560                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1561                 return -1;
1562         }
1563
1564         if (ctdb_db->seqnum_update == NULL) {
1565                 ctdb_db->seqnum_update = tevent_add_timer(
1566                         ctdb->ev, ctdb_db,
1567                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1568                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1569                         ctdb_ltdb_seqnum_check, ctdb_db);
1570         }
1571
1572         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1573         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1574         return 0;
1575 }
1576
1577 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1578 {
1579         if (ctdb_db_sticky(ctdb_db)) {
1580                 return 0;
1581         }
1582
1583         if (! ctdb_db_volatile(ctdb_db)) {
1584                 DEBUG(DEBUG_ERR,
1585                       ("Non-volatile databases do not support sticky flag\n"));
1586                 return -1;
1587         }
1588
1589         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1590
1591         ctdb_db_set_sticky(ctdb_db);
1592
1593         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1594
1595         return 0;
1596 }
1597
1598 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1599 {
1600         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1601         int i;
1602
1603         for (i=0; i<MAX_HOT_KEYS; i++) {
1604                 if (s->hot_keys[i].key.dsize > 0) {
1605                         talloc_free(s->hot_keys[i].key.dptr);
1606                 }
1607         }
1608
1609         ZERO_STRUCT(ctdb_db->statistics);
1610 }
1611
1612 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1613                                 uint32_t db_id,
1614                                 TDB_DATA *outdata)
1615 {
1616         struct ctdb_db_context *ctdb_db;
1617         struct ctdb_db_statistics_old *stats;
1618         int i;
1619         int len;
1620         char *ptr;
1621
1622         ctdb_db = find_ctdb_db(ctdb, db_id);
1623         if (!ctdb_db) {
1624                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1625                 return -1;
1626         }
1627
1628         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1629         for (i = 0; i < MAX_HOT_KEYS; i++) {
1630                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1631         }
1632
1633         stats = talloc_size(outdata, len);
1634         if (stats == NULL) {
1635                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1636                 return -1;
1637         }
1638
1639         memcpy(stats, &ctdb_db->statistics,
1640                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1641
1642         stats->num_hot_keys = MAX_HOT_KEYS;
1643
1644         ptr = &stats->hot_keys_wire[0];
1645         for (i = 0; i < MAX_HOT_KEYS; i++) {
1646                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1647                        ctdb_db->statistics.hot_keys[i].key.dsize);
1648                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1649         }
1650
1651         outdata->dptr  = (uint8_t *)stats;
1652         outdata->dsize = len;
1653
1654         return 0;
1655 }