2d1daafb1575ef208f4b530870aa64d85ecbd1c9
[sharpe/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #include "server/ctdb_config.h"
45
46 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47
48 /**
49  * write a record to a normal database
50  *
51  * This is the server-variant of the ctdb_ltdb_store function.
52  * It contains logic to determine whether a record should be
53  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
54  * controls to the local ctdb daemon if apporpriate.
55  */
56 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
57                                   TDB_DATA key,
58                                   struct ctdb_ltdb_header *header,
59                                   TDB_DATA data)
60 {
61         struct ctdb_context *ctdb = ctdb_db->ctdb;
62         TDB_DATA rec[2];
63         uint32_t hsize = sizeof(struct ctdb_ltdb_header);
64         int ret;
65         bool seqnum_suppressed = false;
66         bool keep = false;
67         bool schedule_for_deletion = false;
68         bool remove_from_delete_queue = false;
69         uint32_t lmaster;
70
71         if (ctdb->flags & CTDB_FLAG_TORTURE) {
72                 TDB_DATA old;
73                 struct ctdb_ltdb_header *h2;
74
75                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
76                 h2 = (struct ctdb_ltdb_header *)old.dptr;
77                 if (old.dptr != NULL &&
78                     old.dsize >= hsize &&
79                     h2->rsn > header->rsn) {
80                         DEBUG(DEBUG_ERR,
81                               ("RSN regression! %"PRIu64" %"PRIu64"\n",
82                                h2->rsn, header->rsn));
83                 }
84                 if (old.dptr) {
85                         free(old.dptr);
86                 }
87         }
88
89         if (ctdb->vnn_map == NULL) {
90                 /*
91                  * Called from a client: always store the record
92                  * Also don't call ctdb_lmaster since it uses the vnn_map!
93                  */
94                 keep = true;
95                 goto store;
96         }
97
98         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99
100         /*
101          * If we migrate an empty record off to another node
102          * and the record has not been migrated with data,
103          * delete the record instead of storing the empty record.
104          */
105         if (data.dsize != 0) {
106                 keep = true;
107         } else if (header->flags & CTDB_REC_RO_FLAGS) {
108                 keep = true;
109         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
110                 /*
111                  * The record is not created by the client but
112                  * automatically by the ctdb_ltdb_fetch logic that
113                  * creates a record with an initial header in the
114                  * ltdb before trying to migrate the record from
115                  * the current lmaster. Keep it instead of trying
116                  * to delete the non-existing record...
117                  */
118                 keep = true;
119                 schedule_for_deletion = true;
120         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
121                 keep = true;
122         } else if (ctdb_db->ctdb->pnn == lmaster) {
123                 /*
124                  * If we are lmaster, then we usually keep the record.
125                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126                  * and the record is empty and has never been migrated
127                  * with data, then we should delete it instead of storing it.
128                  * This is part of the vacuuming process.
129                  *
130                  * The reason that we usually need to store even empty records
131                  * on the lmaster is that a client operating directly on the
132                  * lmaster (== dmaster) expects the local copy of the record to
133                  * exist after successful ctdb migrate call. If the record does
134                  * not exist, the client goes into a migrate loop and eventually
135                  * fails. So storing the empty record makes sure that we do not
136                  * need to change the client code.
137                  */
138                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
139                         keep = true;
140                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141                         keep = true;
142                 }
143         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
144                 keep = true;
145         }
146
147         if (keep) {
148                 if (ctdb_db_volatile(ctdb_db) &&
149                     (ctdb_db->ctdb->pnn == header->dmaster) &&
150                     !(header->flags & CTDB_REC_RO_FLAGS))
151                 {
152                         header->rsn++;
153
154                         if (data.dsize == 0) {
155                                 schedule_for_deletion = true;
156                         }
157                 }
158                 remove_from_delete_queue = !schedule_for_deletion;
159         }
160
161 store:
162         /*
163          * The VACUUM_MIGRATED flag is only set temporarily for
164          * the above logic when the record was retrieved by a
165          * VACUUM_MIGRATE call and should not be stored in the
166          * database.
167          *
168          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169          * and there are two cases in which the corresponding record
170          * is stored in the local database:
171          * 1. The record has been migrated with data in the past
172          *    (the MIGRATED_WITH_DATA record flag is set).
173          * 2. The record has been filled with data again since it
174          *    had been submitted in the VACUUM_FETCH message to the
175          *    lmaster.
176          * For such records it is important to not store the
177          * VACUUM_MIGRATED flag in the database.
178          */
179         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180
181         /*
182          * Similarly, clear the AUTOMATIC flag which should not enter
183          * the local database copy since this would require client
184          * modifications to clear the flag when the client stores
185          * the record.
186          */
187         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
188
189         rec[0].dsize = hsize;
190         rec[0].dptr = (uint8_t *)header;
191
192         rec[1].dsize = data.dsize;
193         rec[1].dptr = data.dptr;
194
195         /* Databases with seqnum updates enabled only get their seqnum
196            changes when/if we modify the data */
197         if (ctdb_db->seqnum_update != NULL) {
198                 TDB_DATA old;
199                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
200
201                 if ((old.dsize == hsize + data.dsize) &&
202                     memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204                         seqnum_suppressed = true;
205                 }
206                 if (old.dptr != NULL) {
207                         free(old.dptr);
208                 }
209         }
210
211         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
212                             ctdb_db->db_name,
213                             keep?"storing":"deleting",
214                             ctdb_hash(&key)));
215
216         if (keep) {
217                 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
218         } else {
219                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
220         }
221
222         if (ret != 0) {
223                 int lvl = DEBUG_ERR;
224
225                 if (keep == false &&
226                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
227                 {
228                         lvl = DEBUG_DEBUG;
229                 }
230
231                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232                             "%d - %s\n",
233                             ctdb_db->db_name,
234                             keep?"store":"delete", ret,
235                             tdb_errorstr(ctdb_db->ltdb->tdb)));
236
237                 schedule_for_deletion = false;
238                 remove_from_delete_queue = false;
239         }
240         if (seqnum_suppressed) {
241                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242         }
243
244         if (schedule_for_deletion) {
245                 int ret2;
246                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
247                 if (ret2 != 0) {
248                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
249                 }
250         }
251
252         if (remove_from_delete_queue) {
253                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
254         }
255
256         return ret;
257 }
258
259 struct lock_fetch_state {
260         struct ctdb_context *ctdb;
261         struct ctdb_db_context *ctdb_db;
262         void (*recv_pkt)(void *, struct ctdb_req_header *);
263         void *recv_context;
264         struct ctdb_req_header *hdr;
265         uint32_t generation;
266         bool ignore_generation;
267 };
268
269 /*
270   called when we should retry the operation
271  */
272 static void lock_fetch_callback(void *p, bool locked)
273 {
274         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275         if (!state->ignore_generation &&
276             state->generation != state->ctdb_db->generation) {
277                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278                 talloc_free(state->hdr);
279                 return;
280         }
281         state->recv_pkt(state->recv_context, state->hdr);
282         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
283 }
284
285
286 /*
287   do a non-blocking ltdb_lock, deferring this ctdb request until we
288   have the chainlock
289
290   It does the following:
291
292    1) tries to get the chainlock. If it succeeds, then it returns 0
293
294    2) if it fails to get a chainlock immediately then it sets up a
295    non-blocking chainlock via ctdb_lock_record, and when it gets the
296    chainlock it re-submits this ctdb request to the main packet
297    receive function.
298
299    This effectively queues all ctdb requests that cannot be
300    immediately satisfied until it can get the lock. This means that
301    the main ctdb daemon will not block waiting for a chainlock held by
302    a client
303
304    There are 3 possible return values:
305
306        0:    means that it got the lock immediately.
307       -1:    means that it failed to get the lock, and won't retry
308       -2:    means that it failed to get the lock immediately, but will retry
309  */
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
311                            TDB_DATA key, struct ctdb_req_header *hdr,
312                            void (*recv_pkt)(void *, struct ctdb_req_header *),
313                            void *recv_context, bool ignore_generation)
314 {
315         int ret;
316         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317         struct lock_request *lreq;
318         struct lock_fetch_state *state;
319         
320         ret = tdb_chainlock_nonblock(tdb, key);
321
322         if (ret != 0 &&
323             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324                 /* a hard failure - don't try again */
325                 return -1;
326         }
327
328         /* when torturing, ensure we test the contended path */
329         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330             random() % 5 == 0) {
331                 ret = -1;
332                 tdb_chainunlock(tdb, key);
333         }
334
335         /* first the non-contended path */
336         if (ret == 0) {
337                 return 0;
338         }
339
340         state = talloc(hdr, struct lock_fetch_state);
341         state->ctdb = ctdb_db->ctdb;
342         state->ctdb_db = ctdb_db;
343         state->hdr = hdr;
344         state->recv_pkt = recv_pkt;
345         state->recv_context = recv_context;
346         state->generation = ctdb_db->generation;
347         state->ignore_generation = ignore_generation;
348
349         /* now the contended path */
350         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
351         if (lreq == NULL) {
352                 return -1;
353         }
354
355         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356            so it won't be freed yet */
357         talloc_steal(state, hdr);
358
359         /* now tell the caller than we will retry asynchronously */
360         return -2;
361 }
362
363 /*
364   a varient of ctdb_ltdb_lock_requeue that also fetches the record
365  */
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
367                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
368                                  struct ctdb_req_header *hdr, TDB_DATA *data,
369                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
370                                  void *recv_context, bool ignore_generation)
371 {
372         int ret;
373
374         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
375                                      recv_context, ignore_generation);
376         if (ret == 0) {
377                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378                 if (ret != 0) {
379                         int uret;
380                         uret = ctdb_ltdb_unlock(ctdb_db, key);
381                         if (uret != 0) {
382                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
383                         }
384                 }
385         }
386         return ret;
387 }
388
389
390 /*
391   paraoid check to see if the db is empty
392  */
393 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
394 {
395         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
396         int count = tdb_traverse_read(tdb, NULL, NULL);
397         if (count != 0) {
398                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
399                          ctdb_db->db_path));
400                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
401         }
402 }
403
404 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
405                                 struct ctdb_db_context *ctdb_db)
406 {
407         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
408         char *old;
409         char *reason = NULL;
410         TDB_DATA key;
411         TDB_DATA val;
412
413         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
414         key.dsize = strlen(ctdb_db->db_name);
415
416         old = ctdb_db->unhealthy_reason;
417         ctdb_db->unhealthy_reason = NULL;
418
419         val = tdb_fetch(tdb, key);
420         if (val.dsize > 0) {
421                 reason = talloc_strndup(ctdb_db,
422                                         (const char *)val.dptr,
423                                         val.dsize);
424                 if (reason == NULL) {
425                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
426                                            (int)val.dsize));
427                         ctdb_db->unhealthy_reason = old;
428                         free(val.dptr);
429                         return -1;
430                 }
431         }
432
433         if (val.dptr) {
434                 free(val.dptr);
435         }
436
437         talloc_free(old);
438         ctdb_db->unhealthy_reason = reason;
439         return 0;
440 }
441
442 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
443                                   struct ctdb_db_context *ctdb_db,
444                                   const char *given_reason,/* NULL means healthy */
445                                   int num_healthy_nodes)
446 {
447         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
448         int ret;
449         TDB_DATA key;
450         TDB_DATA val;
451         char *new_reason = NULL;
452         char *old_reason = NULL;
453
454         ret = tdb_transaction_start(tdb);
455         if (ret != 0) {
456                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
457                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
458                 return -1;
459         }
460
461         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
462         if (ret != 0) {
463                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
464                                    ctdb_db->db_name, ret));
465                 return -1;
466         }
467         old_reason = ctdb_db->unhealthy_reason;
468
469         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
470         key.dsize = strlen(ctdb_db->db_name);
471
472         if (given_reason) {
473                 new_reason = talloc_strdup(ctdb_db, given_reason);
474                 if (new_reason == NULL) {
475                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
476                                           given_reason));
477                         return -1;
478                 }
479         } else if (old_reason && num_healthy_nodes == 0) {
480                 /*
481                  * If the reason indicates ok, but there where no healthy nodes
482                  * available, that it means, we have not recovered valid content
483                  * of the db. So if there's an old reason, prefix it with
484                  * "NO-HEALTHY-NODES - "
485                  */
486                 const char *prefix;
487
488 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
489                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
490                 if (ret != 0) {
491                         prefix = _TMP_PREFIX;
492                 } else {
493                         prefix = "";
494                 }
495                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
496                                          prefix, old_reason);
497                 if (new_reason == NULL) {
498                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
499                                           prefix, old_reason));
500                         return -1;
501                 }
502 #undef _TMP_PREFIX
503         }
504
505         if (new_reason) {
506                 val.dptr = discard_const_p(uint8_t, new_reason);
507                 val.dsize = strlen(new_reason);
508
509                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
510                 if (ret != 0) {
511                         tdb_transaction_cancel(tdb);
512                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
513                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
514                                            ret, tdb_errorstr(tdb)));
515                         talloc_free(new_reason);
516                         return -1;
517                 }
518                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
519                                    ctdb_db->db_name, new_reason));
520         } else if (old_reason) {
521                 ret = tdb_delete(tdb, key);
522                 if (ret != 0) {
523                         tdb_transaction_cancel(tdb);
524                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
525                                            tdb_name(tdb), ctdb_db->db_name,
526                                            ret, tdb_errorstr(tdb)));
527                         talloc_free(new_reason);
528                         return -1;
529                 }
530                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
531                                    ctdb_db->db_name));
532         }
533
534         ret = tdb_transaction_commit(tdb);
535         if (ret != TDB_SUCCESS) {
536                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
537                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
538                 talloc_free(new_reason);
539                 return -1;
540         }
541
542         talloc_free(old_reason);
543         ctdb_db->unhealthy_reason = new_reason;
544
545         return 0;
546 }
547
548 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
549                                      struct ctdb_db_context *ctdb_db)
550 {
551         time_t now = time(NULL);
552         char *new_path;
553         char *new_reason;
554         int ret;
555         struct tm *tm;
556
557         tm = gmtime(&now);
558
559         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
560         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
561                                    "%04u%02u%02u%02u%02u%02u.0Z",
562                                    ctdb_db->db_path,
563                                    tm->tm_year+1900, tm->tm_mon+1,
564                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
565                                    tm->tm_sec);
566         if (new_path == NULL) {
567                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
568                 return -1;
569         }
570
571         new_reason = talloc_asprintf(ctdb_db,
572                                      "ERROR - Backup of corrupted TDB in '%s'",
573                                      new_path);
574         if (new_reason == NULL) {
575                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576                 return -1;
577         }
578         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
579         talloc_free(new_reason);
580         if (ret != 0) {
581                 DEBUG(DEBUG_CRIT,(__location__
582                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
583                                  ctdb_db->db_path));
584                 return -1;
585         }
586
587         ret = rename(ctdb_db->db_path, new_path);
588         if (ret != 0) {
589                 DEBUG(DEBUG_CRIT,(__location__
590                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
591                                   ctdb_db->db_path, new_path,
592                                   errno, strerror(errno)));
593                 talloc_free(new_path);
594                 return -1;
595         }
596
597         DEBUG(DEBUG_CRIT,(__location__
598                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
599                          ctdb_db->db_path, new_path));
600         talloc_free(new_path);
601         return 0;
602 }
603
604 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
605 {
606         struct ctdb_db_context *ctdb_db;
607         int ret;
608         int ok = 0;
609         int fail = 0;
610
611         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
612                 if (!ctdb_db_persistent(ctdb_db)) {
613                         continue;
614                 }
615
616                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
617                 if (ret != 0) {
618                         DEBUG(DEBUG_ALERT,(__location__
619                                            " load persistent health for '%s' failed\n",
620                                            ctdb_db->db_path));
621                         return -1;
622                 }
623
624                 if (ctdb_db->unhealthy_reason == NULL) {
625                         ok++;
626                         DEBUG(DEBUG_INFO,(__location__
627                                    " persistent db '%s' healthy\n",
628                                    ctdb_db->db_path));
629                         continue;
630                 }
631
632                 fail++;
633                 DEBUG(DEBUG_ALERT,(__location__
634                                    " persistent db '%s' unhealthy: %s\n",
635                                    ctdb_db->db_path,
636                                    ctdb_db->unhealthy_reason));
637         }
638         DEBUG(DEBUG_NOTICE,
639               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
640                ok, fail));
641
642         if (fail != 0) {
643                 return -1;
644         }
645
646         return 0;
647 }
648
649
650 /*
651   mark a database - as healthy
652  */
653 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
654 {
655         uint32_t db_id = *(uint32_t *)indata.dptr;
656         struct ctdb_db_context *ctdb_db;
657         int ret;
658         bool may_recover = false;
659
660         ctdb_db = find_ctdb_db(ctdb, db_id);
661         if (!ctdb_db) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
663                 return -1;
664         }
665
666         if (ctdb_db->unhealthy_reason) {
667                 may_recover = true;
668         }
669
670         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
671         if (ret != 0) {
672                 DEBUG(DEBUG_ERR,(__location__
673                                  " ctdb_update_persistent_health(%s) failed\n",
674                                  ctdb_db->db_name));
675                 return -1;
676         }
677
678         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
679                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
680                                   ctdb_db->db_name));
681                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
682         }
683
684         return 0;
685 }
686
687 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
688                                    TDB_DATA indata,
689                                    TDB_DATA *outdata)
690 {
691         uint32_t db_id = *(uint32_t *)indata.dptr;
692         struct ctdb_db_context *ctdb_db;
693         int ret;
694
695         ctdb_db = find_ctdb_db(ctdb, db_id);
696         if (!ctdb_db) {
697                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
698                 return -1;
699         }
700
701         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
702         if (ret != 0) {
703                 DEBUG(DEBUG_ERR,(__location__
704                                  " ctdb_load_persistent_health(%s) failed\n",
705                                  ctdb_db->db_name));
706                 return -1;
707         }
708
709         *outdata = tdb_null;
710         if (ctdb_db->unhealthy_reason) {
711                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
712                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
713         }
714
715         return 0;
716 }
717
718
719 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
720 {
721         char *ropath;
722
723         if (ctdb_db_readonly(ctdb_db)) {
724                 return 0;
725         }
726
727         if (! ctdb_db_volatile(ctdb_db)) {
728                 DEBUG(DEBUG_ERR,
729                       ("Non-volatile databases do not support readonly flag\n"));
730                 return -1;
731         }
732
733         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
734         if (ropath == NULL) {
735                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
736                 return -1;
737         }
738         ctdb_db->rottdb = tdb_open(ropath, 
739                               ctdb->tunable.database_hash_size, 
740                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
741                               O_CREAT|O_RDWR, 0600);
742         if (ctdb_db->rottdb == NULL) {
743                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
744                 talloc_free(ropath);
745                 return -1;
746         }
747
748         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
749
750         ctdb_db_set_readonly(ctdb_db);
751
752         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
753
754         talloc_free(ropath);
755         return 0;
756 }
757
758 /*
759   attach to a database, handling both persistent and non-persistent databases
760   return 0 on success, -1 on failure
761  */
762 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
763                              uint8_t db_flags, const char *unhealthy_reason)
764 {
765         struct ctdb_db_context *ctdb_db, *tmp_db;
766         int ret;
767         struct TDB_DATA key;
768         int tdb_flags;
769         int mode = 0600;
770         int remaining_tries = 0;
771
772         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
773         CTDB_NO_MEMORY(ctdb, ctdb_db);
774
775         ctdb_db->ctdb = ctdb;
776         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
777         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
778
779         key.dsize = strlen(db_name)+1;
780         key.dptr  = discard_const(db_name);
781         ctdb_db->db_id = ctdb_hash(&key);
782         ctdb_db->db_flags = db_flags;
783
784         if (ctdb_db_volatile(ctdb_db)) {
785                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
786                 if (ctdb_db->delete_queue == NULL) {
787                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
788                 }
789
790                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
791         }
792
793         /* check for hash collisions */
794         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
795                 if (tmp_db->db_id == ctdb_db->db_id) {
796                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
797                                  tmp_db->db_id, db_name, tmp_db->db_name));
798                         talloc_free(ctdb_db);
799                         return -1;
800                 }
801         }
802
803         if (ctdb_db_persistent(ctdb_db)) {
804                 if (unhealthy_reason) {
805                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
806                                                             unhealthy_reason, 0);
807                         if (ret != 0) {
808                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
809                                                    ctdb_db->db_name, unhealthy_reason, ret));
810                                 talloc_free(ctdb_db);
811                                 return -1;
812                         }
813                 }
814
815                 if (ctdb->max_persistent_check_errors > 0) {
816                         remaining_tries = 1;
817                 }
818                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
819                         remaining_tries = 0;
820                 }
821
822                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
823                 if (ret != 0) {
824                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
825                                    ctdb_db->db_name, ret));
826                         talloc_free(ctdb_db);
827                         return -1;
828                 }
829         }
830
831         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
832                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
833                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
834                 talloc_free(ctdb_db);
835                 return -1;
836         }
837
838         if (ctdb_db->unhealthy_reason) {
839                 /* this is just a warning, but we want that in the log file! */
840                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
841                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
842         }
843
844         /* open the database */
845         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
846                                            ctdb_db_persistent(ctdb_db) ?
847                                                 ctdb->db_directory_persistent :
848                                                 ctdb->db_directory,
849                                            db_name, ctdb->pnn);
850
851         tdb_flags = ctdb_db_tdb_flags(db_flags,
852                                       ctdb->valgrinding,
853                                       ctdb_config.tdb_mutexes);
854
855 again:
856         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
857                                       ctdb->tunable.database_hash_size, 
858                                       tdb_flags, 
859                                       O_CREAT|O_RDWR, mode);
860         if (ctdb_db->ltdb == NULL) {
861                 struct stat st;
862                 int saved_errno = errno;
863
864                 if (! ctdb_db_persistent(ctdb_db)) {
865                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
866                                           ctdb_db->db_path,
867                                           saved_errno,
868                                           strerror(saved_errno)));
869                         talloc_free(ctdb_db);
870                         return -1;
871                 }
872
873                 if (remaining_tries == 0) {
874                         DEBUG(DEBUG_CRIT,(__location__
875                                           "Failed to open persistent tdb '%s': %d - %s\n",
876                                           ctdb_db->db_path,
877                                           saved_errno,
878                                           strerror(saved_errno)));
879                         talloc_free(ctdb_db);
880                         return -1;
881                 }
882
883                 ret = stat(ctdb_db->db_path, &st);
884                 if (ret != 0) {
885                         DEBUG(DEBUG_CRIT,(__location__
886                                           "Failed to open persistent tdb '%s': %d - %s\n",
887                                           ctdb_db->db_path,
888                                           saved_errno,
889                                           strerror(saved_errno)));
890                         talloc_free(ctdb_db);
891                         return -1;
892                 }
893
894                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
895                 if (ret != 0) {
896                         DEBUG(DEBUG_CRIT,(__location__
897                                           "Failed to open persistent tdb '%s': %d - %s\n",
898                                           ctdb_db->db_path,
899                                           saved_errno,
900                                           strerror(saved_errno)));
901                         talloc_free(ctdb_db);
902                         return -1;
903                 }
904
905                 remaining_tries--;
906                 mode = st.st_mode;
907                 goto again;
908         }
909
910         if (!ctdb_db_persistent(ctdb_db)) {
911                 ctdb_check_db_empty(ctdb_db);
912         } else {
913                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
914                 if (ret != 0) {
915                         int fd;
916                         struct stat st;
917
918                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
919                                           ctdb_db->db_path, ret,
920                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
921                         if (remaining_tries == 0) {
922                                 talloc_free(ctdb_db);
923                                 return -1;
924                         }
925
926                         fd = tdb_fd(ctdb_db->ltdb->tdb);
927                         ret = fstat(fd, &st);
928                         if (ret != 0) {
929                                 DEBUG(DEBUG_CRIT,(__location__
930                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
931                                                   ctdb_db->db_path,
932                                                   errno,
933                                                   strerror(errno)));
934                                 talloc_free(ctdb_db);
935                                 return -1;
936                         }
937
938                         /* close the TDB */
939                         talloc_free(ctdb_db->ltdb);
940                         ctdb_db->ltdb = NULL;
941
942                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
943                         if (ret != 0) {
944                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
945                                                   ctdb_db->db_path));
946                                 talloc_free(ctdb_db);
947                                 return -1;
948                         }
949
950                         remaining_tries--;
951                         mode = st.st_mode;
952                         goto again;
953                 }
954         }
955
956         /* remember the flags the client has specified */
957         tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
958
959
960         /* set up a rb tree we can use to track which records we have a 
961            fetch-lock in-flight for so we can defer any additional calls
962            for the same record.
963          */
964         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
965         if (ctdb_db->deferred_fetch == NULL) {
966                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
967                 talloc_free(ctdb_db);
968                 return -1;
969         }
970
971         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
972         if (ctdb_db->defer_dmaster == NULL) {
973                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
974                                   ctdb_db->db_name));
975                 talloc_free(ctdb_db);
976                 return -1;
977         }
978
979         DLIST_ADD(ctdb->db_list, ctdb_db);
980
981         /* setting this can help some high churn databases */
982         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
983
984         /* 
985            all databases support the "null" function. we need this in
986            order to do forced migration of records
987         */
988         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
989         if (ret != 0) {
990                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
991                 talloc_free(ctdb_db);
992                 return -1;
993         }
994
995         /* 
996            all databases support the "fetch" function. we need this
997            for efficient Samba3 ctdb fetch
998         */
999         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1000         if (ret != 0) {
1001                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1002                 talloc_free(ctdb_db);
1003                 return -1;
1004         }
1005
1006         /* 
1007            all databases support the "fetch_with_header" function. we need this
1008            for efficient readonly record fetches
1009         */
1010         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1011         if (ret != 0) {
1012                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1013                 talloc_free(ctdb_db);
1014                 return -1;
1015         }
1016
1017         ret = ctdb_vacuum_init(ctdb_db);
1018         if (ret != 0) {
1019                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1020                                   "database '%s'\n", ctdb_db->db_name));
1021                 talloc_free(ctdb_db);
1022                 return -1;
1023         }
1024
1025         ret = ctdb_migration_init(ctdb_db);
1026         if (ret != 0) {
1027                 DEBUG(DEBUG_ERR,
1028                       ("Failed to setup migration tracking for db '%s'\n",
1029                        ctdb_db->db_name));
1030                 talloc_free(ctdb_db);
1031                 return -1;
1032         }
1033
1034         ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1035                            &ctdb_db->lock_log);
1036         if (ret != 0) {
1037                 DEBUG(DEBUG_ERR,
1038                       ("Failed to setup lock logging for db '%s'\n",
1039                        ctdb_db->db_name));
1040                 talloc_free(ctdb_db);
1041                 return -1;
1042         }
1043
1044         ctdb_db->generation = ctdb->vnn_map->generation;
1045
1046         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1047                             ctdb_db->db_path, tdb_flags));
1048
1049         /* success */
1050         return 0;
1051 }
1052
1053
1054 struct ctdb_deferred_attach_context {
1055         struct ctdb_deferred_attach_context *next, *prev;
1056         struct ctdb_context *ctdb;
1057         struct ctdb_req_control_old *c;
1058 };
1059
1060
1061 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1062 {
1063         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1064
1065         return 0;
1066 }
1067
1068 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1069                                          struct tevent_timer *te,
1070                                          struct timeval t, void *private_data)
1071 {
1072         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1073         struct ctdb_context *ctdb = da_ctx->ctdb;
1074
1075         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1076         talloc_free(da_ctx);
1077 }
1078
1079 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1080                                           struct tevent_timer *te,
1081                                           struct timeval t, void *private_data)
1082 {
1083         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1084         struct ctdb_context *ctdb = da_ctx->ctdb;
1085
1086         /* This talloc-steals the packet ->c */
1087         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1088         talloc_free(da_ctx);
1089 }
1090
1091 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1092 {
1093         struct ctdb_deferred_attach_context *da_ctx;
1094
1095         /* call it from the main event loop as soon as the current event 
1096            finishes.
1097          */
1098         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1099                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1100                 tevent_add_timer(ctdb->ev, da_ctx,
1101                                  timeval_current_ofs(1,0),
1102                                  ctdb_deferred_attach_callback, da_ctx);
1103         }
1104
1105         return 0;
1106 }
1107
1108 /*
1109   a client has asked to attach a new database
1110  */
1111 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
1112                                TDB_DATA indata,
1113                                TDB_DATA *outdata,
1114                                uint8_t db_flags,
1115                                uint32_t srcnode,
1116                                uint32_t client_id,
1117                                struct ctdb_req_control_old *c,
1118                                bool *async_reply)
1119 {
1120         const char *db_name = (const char *)indata.dptr;
1121         struct ctdb_db_context *db;
1122         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1123         struct ctdb_client *client = NULL;
1124         uint32_t opcode;
1125
1126         if (ctdb->tunable.allow_client_db_attach == 0) {
1127                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1128                                   "AllowClientDBAccess == 0\n", db_name));
1129                 return -1;
1130         }
1131
1132         /* don't allow any local clients to attach while we are in recovery mode
1133          * except for the recovery daemon.
1134          * allow all attach from the network since these are always from remote
1135          * recovery daemons.
1136          */
1137         if (srcnode == ctdb->pnn && client_id != 0) {
1138                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1139         }
1140         if (client != NULL) {
1141                 /* If the node is inactive it is not part of the cluster
1142                    and we should not allow clients to attach to any
1143                    databases
1144                 */
1145                 if (node->flags & NODE_FLAGS_INACTIVE) {
1146                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1147                         return -1;
1148                 }
1149
1150                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1151                     client->pid != ctdb->recoverd_pid &&
1152                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1153                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1154
1155                         if (da_ctx == NULL) {
1156                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1157                                 return -1;
1158                         }
1159
1160                         da_ctx->ctdb = ctdb;
1161                         da_ctx->c = talloc_steal(da_ctx, c);
1162                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1163                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1164
1165                         tevent_add_timer(ctdb->ev, da_ctx,
1166                                          timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1167                                          ctdb_deferred_attach_timeout, da_ctx);
1168
1169                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1170                         *async_reply = true;
1171                         return 0;
1172                 }
1173         }
1174
1175         /* see if we already have this name */
1176         db = ctdb_db_handle(ctdb, db_name);
1177         if (db) {
1178                 if ((db->db_flags & db_flags) != db_flags) {
1179                         DEBUG(DEBUG_ERR,
1180                               ("Error: Failed to re-attach with 0x%x flags,"
1181                                " database has 0x%x flags\n", db_flags,
1182                                db->db_flags));
1183                         return -1;
1184                 }
1185                 outdata->dptr  = (uint8_t *)&db->db_id;
1186                 outdata->dsize = sizeof(db->db_id);
1187                 return 0;
1188         }
1189
1190         if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
1191                 return -1;
1192         }
1193
1194         db = ctdb_db_handle(ctdb, db_name);
1195         if (!db) {
1196                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1197                 return -1;
1198         }
1199
1200         outdata->dptr  = (uint8_t *)&db->db_id;
1201         outdata->dsize = sizeof(db->db_id);
1202
1203         /* Try to ensure it's locked in mem */
1204         lockdown_memory(ctdb->valgrinding);
1205
1206         if (ctdb_db_persistent(db)) {
1207                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1208         } else if (ctdb_db_replicated(db)) {
1209                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1210         } else {
1211                 opcode = CTDB_CONTROL_DB_ATTACH;
1212         }
1213
1214         /* tell all the other nodes about this database */
1215         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
1216                                  0, CTDB_CTRL_FLAG_NOREPLY,
1217                                  indata, NULL, NULL);
1218
1219         /* success */
1220         return 0;
1221 }
1222
1223 /*
1224  * a client has asked to detach from a database
1225  */
1226 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1227                                uint32_t client_id)
1228 {
1229         uint32_t db_id;
1230         struct ctdb_db_context *ctdb_db;
1231         struct ctdb_client *client = NULL;
1232
1233         db_id = *(uint32_t *)indata.dptr;
1234         ctdb_db = find_ctdb_db(ctdb, db_id);
1235         if (ctdb_db == NULL) {
1236                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1237                                   db_id));
1238                 return -1;
1239         }
1240
1241         if (ctdb->tunable.allow_client_db_attach == 1) {
1242                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1243                                   "Clients are allowed access to databases "
1244                                   "(AllowClientDBAccess == 1)\n",
1245                                   ctdb_db->db_name));
1246                 return -1;
1247         }
1248
1249         if (! ctdb_db_volatile(ctdb_db)) {
1250                 DEBUG(DEBUG_ERR,
1251                       ("Detaching non-volatile database %s denied\n",
1252                        ctdb_db->db_name));
1253                 return -1;
1254         }
1255
1256         /* Cannot detach from database when in recovery */
1257         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1258                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1259                 return -1;
1260         }
1261
1262         /* If a control comes from a client, then broadcast it to all nodes.
1263          * Do the actual detach only if the control comes from other daemons.
1264          */
1265         if (client_id != 0) {
1266                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1267                 if (client != NULL) {
1268                         /* forward the control to all the nodes */
1269                         ctdb_daemon_send_control(ctdb,
1270                                                  CTDB_BROADCAST_CONNECTED, 0,
1271                                                  CTDB_CONTROL_DB_DETACH, 0,
1272                                                  CTDB_CTRL_FLAG_NOREPLY,
1273                                                  indata, NULL, NULL);
1274                         return 0;
1275                 }
1276                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1277                                   "for database '%s'\n", ctdb_db->db_name));
1278                 return -1;
1279         }
1280
1281         /* Detach database from recoverd */
1282         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1283                                      CTDB_SRVID_DETACH_DATABASE,
1284                                      indata) != 0) {
1285                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1286                 return -1;
1287         }
1288
1289         /* Disable vacuuming and drop all vacuuming data */
1290         talloc_free(ctdb_db->vacuum_handle);
1291         talloc_free(ctdb_db->delete_queue);
1292
1293         /* Terminate any deferred fetch */
1294         talloc_free(ctdb_db->deferred_fetch);
1295
1296         /* Terminate any traverses */
1297         while (ctdb_db->traverse) {
1298                 talloc_free(ctdb_db->traverse);
1299         }
1300
1301         /* Terminate any revokes */
1302         while (ctdb_db->revokechild_active) {
1303                 talloc_free(ctdb_db->revokechild_active);
1304         }
1305
1306         /* Free readonly tracking database */
1307         if (ctdb_db_readonly(ctdb_db)) {
1308                 talloc_free(ctdb_db->rottdb);
1309         }
1310
1311         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1312
1313         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1314                              ctdb_db->db_name));
1315         talloc_free(ctdb_db);
1316
1317         return 0;
1318 }
1319
1320 /*
1321   attach to all existing persistent databases
1322  */
1323 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1324                                   const char *unhealthy_reason)
1325 {
1326         DIR *d;
1327         struct dirent *de;
1328
1329         /* open the persistent db directory and scan it for files */
1330         d = opendir(ctdb->db_directory_persistent);
1331         if (d == NULL) {
1332                 return 0;
1333         }
1334
1335         while ((de=readdir(d))) {
1336                 char *p, *s, *q;
1337                 size_t len = strlen(de->d_name);
1338                 uint32_t node;
1339                 int invalid_name = 0;
1340                 
1341                 s = talloc_strdup(ctdb, de->d_name);
1342                 if (s == NULL) {
1343                         closedir(d);
1344                         CTDB_NO_MEMORY(ctdb, s);
1345                 }
1346
1347                 /* only accept names ending in .tdb */
1348                 p = strstr(s, ".tdb.");
1349                 if (len < 7 || p == NULL) {
1350                         talloc_free(s);
1351                         continue;
1352                 }
1353
1354                 /* only accept names ending with .tdb. and any number of digits */
1355                 q = p+5;
1356                 while (*q != 0 && invalid_name == 0) {
1357                         if (!isdigit(*q++)) {
1358                                 invalid_name = 1;
1359                         }
1360                 }
1361                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1362                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1363                         talloc_free(s);
1364                         continue;
1365                 }
1366                 p[4] = 0;
1367
1368                 if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
1369                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1370                         closedir(d);
1371                         talloc_free(s);
1372                         return -1;
1373                 }
1374
1375                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1376
1377                 talloc_free(s);
1378         }
1379         closedir(d);
1380         return 0;
1381 }
1382
1383 int ctdb_attach_databases(struct ctdb_context *ctdb)
1384 {
1385         int ret;
1386         char *persistent_health_path = NULL;
1387         char *unhealthy_reason = NULL;
1388         bool first_try = true;
1389
1390         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1391                                                  ctdb->db_directory_state,
1392                                                  PERSISTENT_HEALTH_TDB,
1393                                                  ctdb->pnn);
1394         if (persistent_health_path == NULL) {
1395                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1396                 return -1;
1397         }
1398
1399 again:
1400
1401         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1402                                                    0, TDB_DISALLOW_NESTING,
1403                                                    O_CREAT | O_RDWR, 0600);
1404         if (ctdb->db_persistent_health == NULL) {
1405                 struct tdb_wrap *tdb;
1406
1407                 if (!first_try) {
1408                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1409                                           persistent_health_path,
1410                                           errno,
1411                                           strerror(errno)));
1412                         talloc_free(persistent_health_path);
1413                         talloc_free(unhealthy_reason);
1414                         return -1;
1415                 }
1416                 first_try = false;
1417
1418                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1419                                                    persistent_health_path,
1420                                                    "was cleared after a failure",
1421                                                    "manual verification needed");
1422                 if (unhealthy_reason == NULL) {
1423                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1424                         talloc_free(persistent_health_path);
1425                         return -1;
1426                 }
1427
1428                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1429                                   persistent_health_path));
1430                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1431                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1432                                     O_CREAT | O_RDWR, 0600);
1433                 if (tdb) {
1434                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1435                                           persistent_health_path,
1436                                           errno,
1437                                           strerror(errno)));
1438                         talloc_free(persistent_health_path);
1439                         talloc_free(unhealthy_reason);
1440                         return -1;
1441                 }
1442
1443                 talloc_free(tdb);
1444                 goto again;
1445         }
1446         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1447         if (ret != 0) {
1448                 struct tdb_wrap *tdb;
1449
1450                 talloc_free(ctdb->db_persistent_health);
1451                 ctdb->db_persistent_health = NULL;
1452
1453                 if (!first_try) {
1454                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1455                                           persistent_health_path));
1456                         talloc_free(persistent_health_path);
1457                         talloc_free(unhealthy_reason);
1458                         return -1;
1459                 }
1460                 first_try = false;
1461
1462                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1463                                                    persistent_health_path,
1464                                                    "was cleared after a failure",
1465                                                    "manual verification needed");
1466                 if (unhealthy_reason == NULL) {
1467                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1468                         talloc_free(persistent_health_path);
1469                         return -1;
1470                 }
1471
1472                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1473                                   persistent_health_path));
1474                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1475                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1476                                     O_CREAT | O_RDWR, 0600);
1477                 if (tdb) {
1478                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1479                                           persistent_health_path,
1480                                           errno,
1481                                           strerror(errno)));
1482                         talloc_free(persistent_health_path);
1483                         talloc_free(unhealthy_reason);
1484                         return -1;
1485                 }
1486
1487                 talloc_free(tdb);
1488                 goto again;
1489         }
1490         talloc_free(persistent_health_path);
1491
1492         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1493         talloc_free(unhealthy_reason);
1494         if (ret != 0) {
1495                 return ret;
1496         }
1497
1498         return 0;
1499 }
1500
1501 /*
1502   called when a broadcast seqnum update comes in
1503  */
1504 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1505 {
1506         struct ctdb_db_context *ctdb_db;
1507         if (srcnode == ctdb->pnn) {
1508                 /* don't update ourselves! */
1509                 return 0;
1510         }
1511
1512         ctdb_db = find_ctdb_db(ctdb, db_id);
1513         if (!ctdb_db) {
1514                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1515                 return -1;
1516         }
1517
1518         if (ctdb_db->unhealthy_reason) {
1519                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1520                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1521                 return -1;
1522         }
1523
1524         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1525         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1526         return 0;
1527 }
1528
1529 /*
1530   timer to check for seqnum changes in a ltdb and propagate them
1531  */
1532 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1533                                    struct tevent_timer *te,
1534                                    struct timeval t, void *p)
1535 {
1536         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1537         struct ctdb_context *ctdb = ctdb_db->ctdb;
1538         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1539         if (new_seqnum != ctdb_db->seqnum) {
1540                 /* something has changed - propagate it */
1541                 TDB_DATA data;
1542                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1543                 data.dsize = sizeof(uint32_t);
1544                 ctdb_daemon_send_control(ctdb,
1545                                          CTDB_BROADCAST_ACTIVE,
1546                                          0,
1547                                          CTDB_CONTROL_UPDATE_SEQNUM,
1548                                          0,
1549                                          CTDB_CTRL_FLAG_NOREPLY,
1550                                          data,
1551                                          NULL,
1552                                          NULL);
1553         }
1554         ctdb_db->seqnum = new_seqnum;
1555
1556         /* setup a new timer */
1557         ctdb_db->seqnum_update =
1558                 tevent_add_timer(ctdb->ev, ctdb_db,
1559                                  timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1560                                                      (ctdb->tunable.seqnum_interval%1000)*1000),
1561                                  ctdb_ltdb_seqnum_check, ctdb_db);
1562 }
1563
1564 /*
1565   enable seqnum handling on this db
1566  */
1567 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1568 {
1569         struct ctdb_db_context *ctdb_db;
1570         ctdb_db = find_ctdb_db(ctdb, db_id);
1571         if (!ctdb_db) {
1572                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1573                 return -1;
1574         }
1575
1576         if (ctdb_db->seqnum_update == NULL) {
1577                 ctdb_db->seqnum_update = tevent_add_timer(
1578                         ctdb->ev, ctdb_db,
1579                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1580                                             (ctdb->tunable.seqnum_interval%1000)*1000),
1581                         ctdb_ltdb_seqnum_check, ctdb_db);
1582         }
1583
1584         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1585         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1586         return 0;
1587 }
1588
1589 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1590 {
1591         if (ctdb_db_sticky(ctdb_db)) {
1592                 return 0;
1593         }
1594
1595         if (! ctdb_db_volatile(ctdb_db)) {
1596                 DEBUG(DEBUG_ERR,
1597                       ("Non-volatile databases do not support sticky flag\n"));
1598                 return -1;
1599         }
1600
1601         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1602
1603         ctdb_db_set_sticky(ctdb_db);
1604
1605         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1606
1607         return 0;
1608 }
1609
1610 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1611 {
1612         struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1613         int i;
1614
1615         for (i=0; i<MAX_HOT_KEYS; i++) {
1616                 if (s->hot_keys[i].key.dsize > 0) {
1617                         talloc_free(s->hot_keys[i].key.dptr);
1618                 }
1619         }
1620
1621         ZERO_STRUCT(ctdb_db->statistics);
1622 }
1623
1624 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1625                                 uint32_t db_id,
1626                                 TDB_DATA *outdata)
1627 {
1628         struct ctdb_db_context *ctdb_db;
1629         struct ctdb_db_statistics_old *stats;
1630         int i;
1631         int len;
1632         char *ptr;
1633
1634         ctdb_db = find_ctdb_db(ctdb, db_id);
1635         if (!ctdb_db) {
1636                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1637                 return -1;
1638         }
1639
1640         len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1641         for (i = 0; i < MAX_HOT_KEYS; i++) {
1642                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1643         }
1644
1645         stats = talloc_size(outdata, len);
1646         if (stats == NULL) {
1647                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1648                 return -1;
1649         }
1650
1651         memcpy(stats, &ctdb_db->statistics,
1652                offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1653
1654         stats->num_hot_keys = MAX_HOT_KEYS;
1655
1656         ptr = &stats->hot_keys_wire[0];
1657         for (i = 0; i < MAX_HOT_KEYS; i++) {
1658                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1659                        ctdb_db->statistics.hot_keys[i].key.dsize);
1660                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1661         }
1662
1663         outdata->dptr  = (uint8_t *)stats;
1664         outdata->dsize = len;
1665
1666         return 0;
1667 }