build: --picky-developer implies --enable-developer
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31 #include "common/reqid.h"
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /**
36  * write a record to a normal database
37  *
38  * This is the server-variant of the ctdb_ltdb_store function.
39  * It contains logic to determine whether a record should be
40  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41  * controls to the local ctdb daemon if apporpriate.
42  */
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
44                                   TDB_DATA key,
45                                   struct ctdb_ltdb_header *header,
46                                   TDB_DATA data)
47 {
48         struct ctdb_context *ctdb = ctdb_db->ctdb;
49         TDB_DATA rec;
50         int ret;
51         bool seqnum_suppressed = false;
52         bool keep = false;
53         bool schedule_for_deletion = false;
54         bool remove_from_delete_queue = false;
55         uint32_t lmaster;
56
57         if (ctdb->flags & CTDB_FLAG_TORTURE) {
58                 struct ctdb_ltdb_header *h2;
59                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
60                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
61                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
62                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
63                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64                 }
65                 if (rec.dptr) free(rec.dptr);
66         }
67
68         if (ctdb->vnn_map == NULL) {
69                 /*
70                  * Called from a client: always store the record
71                  * Also don't call ctdb_lmaster since it uses the vnn_map!
72                  */
73                 keep = true;
74                 goto store;
75         }
76
77         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
78
79         /*
80          * If we migrate an empty record off to another node
81          * and the record has not been migrated with data,
82          * delete the record instead of storing the empty record.
83          */
84         if (data.dsize != 0) {
85                 keep = true;
86         } else if (header->flags & CTDB_REC_RO_FLAGS) {
87                 keep = true;
88         } else if (ctdb_db->persistent) {
89                 keep = true;
90         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
91                 /*
92                  * The record is not created by the client but
93                  * automatically by the ctdb_ltdb_fetch logic that
94                  * creates a record with an initial header in the
95                  * ltdb before trying to migrate the record from
96                  * the current lmaster. Keep it instead of trying
97                  * to delete the non-existing record...
98                  */
99                 keep = true;
100                 schedule_for_deletion = true;
101         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
102                 keep = true;
103         } else if (ctdb_db->ctdb->pnn == lmaster) {
104                 /*
105                  * If we are lmaster, then we usually keep the record.
106                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
107                  * and the record is empty and has never been migrated
108                  * with data, then we should delete it instead of storing it.
109                  * This is part of the vacuuming process.
110                  *
111                  * The reason that we usually need to store even empty records
112                  * on the lmaster is that a client operating directly on the
113                  * lmaster (== dmaster) expects the local copy of the record to
114                  * exist after successful ctdb migrate call. If the record does
115                  * not exist, the client goes into a migrate loop and eventually
116                  * fails. So storing the empty record makes sure that we do not
117                  * need to change the client code.
118                  */
119                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
120                         keep = true;
121                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
122                         keep = true;
123                 }
124         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
125                 keep = true;
126         }
127
128         if (keep) {
129                 if (!ctdb_db->persistent &&
130                     (ctdb_db->ctdb->pnn == header->dmaster) &&
131                     !(header->flags & CTDB_REC_RO_FLAGS))
132                 {
133                         header->rsn++;
134
135                         if (data.dsize == 0) {
136                                 schedule_for_deletion = true;
137                         }
138                 }
139                 remove_from_delete_queue = !schedule_for_deletion;
140         }
141
142 store:
143         /*
144          * The VACUUM_MIGRATED flag is only set temporarily for
145          * the above logic when the record was retrieved by a
146          * VACUUM_MIGRATE call and should not be stored in the
147          * database.
148          *
149          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
150          * and there are two cases in which the corresponding record
151          * is stored in the local database:
152          * 1. The record has been migrated with data in the past
153          *    (the MIGRATED_WITH_DATA record flag is set).
154          * 2. The record has been filled with data again since it
155          *    had been submitted in the VACUUM_FETCH message to the
156          *    lmaster.
157          * For such records it is important to not store the
158          * VACUUM_MIGRATED flag in the database.
159          */
160         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
161
162         /*
163          * Similarly, clear the AUTOMATIC flag which should not enter
164          * the local database copy since this would require client
165          * modifications to clear the flag when the client stores
166          * the record.
167          */
168         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
169
170         rec.dsize = sizeof(*header) + data.dsize;
171         rec.dptr = talloc_size(ctdb, rec.dsize);
172         CTDB_NO_MEMORY(ctdb, rec.dptr);
173
174         memcpy(rec.dptr, header, sizeof(*header));
175         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
176
177         /* Databases with seqnum updates enabled only get their seqnum
178            changes when/if we modify the data */
179         if (ctdb_db->seqnum_update != NULL) {
180                 TDB_DATA old;
181                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
182
183                 if ( (old.dsize == rec.dsize)
184                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
185                           rec.dptr+sizeof(struct ctdb_ltdb_header),
186                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
187                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
188                         seqnum_suppressed = true;
189                 }
190                 if (old.dptr) free(old.dptr);
191         }
192
193         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
194                             ctdb_db->db_name,
195                             keep?"storing":"deleting",
196                             ctdb_hash(&key)));
197
198         if (keep) {
199                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
200         } else {
201                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
202         }
203
204         if (ret != 0) {
205                 int lvl = DEBUG_ERR;
206
207                 if (keep == false &&
208                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
209                 {
210                         lvl = DEBUG_DEBUG;
211                 }
212
213                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
214                             "%d - %s\n",
215                             ctdb_db->db_name,
216                             keep?"store":"delete", ret,
217                             tdb_errorstr(ctdb_db->ltdb->tdb)));
218
219                 schedule_for_deletion = false;
220                 remove_from_delete_queue = false;
221         }
222         if (seqnum_suppressed) {
223                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
224         }
225
226         talloc_free(rec.dptr);
227
228         if (schedule_for_deletion) {
229                 int ret2;
230                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
231                 if (ret2 != 0) {
232                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
233                 }
234         }
235
236         if (remove_from_delete_queue) {
237                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
238         }
239
240         return ret;
241 }
242
243 struct lock_fetch_state {
244         struct ctdb_context *ctdb;
245         struct ctdb_db_context *ctdb_db;
246         void (*recv_pkt)(void *, struct ctdb_req_header *);
247         void *recv_context;
248         struct ctdb_req_header *hdr;
249         uint32_t generation;
250         bool ignore_generation;
251 };
252
253 /*
254   called when we should retry the operation
255  */
256 static void lock_fetch_callback(void *p, bool locked)
257 {
258         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
259         if (!state->ignore_generation &&
260             state->generation != state->ctdb_db->generation) {
261                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
262                 talloc_free(state->hdr);
263                 return;
264         }
265         state->recv_pkt(state->recv_context, state->hdr);
266         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
267 }
268
269
270 /*
271   do a non-blocking ltdb_lock, deferring this ctdb request until we
272   have the chainlock
273
274   It does the following:
275
276    1) tries to get the chainlock. If it succeeds, then it returns 0
277
278    2) if it fails to get a chainlock immediately then it sets up a
279    non-blocking chainlock via ctdb_lock_record, and when it gets the
280    chainlock it re-submits this ctdb request to the main packet
281    receive function.
282
283    This effectively queues all ctdb requests that cannot be
284    immediately satisfied until it can get the lock. This means that
285    the main ctdb daemon will not block waiting for a chainlock held by
286    a client
287
288    There are 3 possible return values:
289
290        0:    means that it got the lock immediately.
291       -1:    means that it failed to get the lock, and won't retry
292       -2:    means that it failed to get the lock immediately, but will retry
293  */
294 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
295                            TDB_DATA key, struct ctdb_req_header *hdr,
296                            void (*recv_pkt)(void *, struct ctdb_req_header *),
297                            void *recv_context, bool ignore_generation)
298 {
299         int ret;
300         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
301         struct lock_request *lreq;
302         struct lock_fetch_state *state;
303         
304         ret = tdb_chainlock_nonblock(tdb, key);
305
306         if (ret != 0 &&
307             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
308                 /* a hard failure - don't try again */
309                 return -1;
310         }
311
312         /* when torturing, ensure we test the contended path */
313         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
314             random() % 5 == 0) {
315                 ret = -1;
316                 tdb_chainunlock(tdb, key);
317         }
318
319         /* first the non-contended path */
320         if (ret == 0) {
321                 return 0;
322         }
323
324         state = talloc(hdr, struct lock_fetch_state);
325         state->ctdb = ctdb_db->ctdb;
326         state->ctdb_db = ctdb_db;
327         state->hdr = hdr;
328         state->recv_pkt = recv_pkt;
329         state->recv_context = recv_context;
330         state->generation = ctdb_db->generation;
331         state->ignore_generation = ignore_generation;
332
333         /* now the contended path */
334         lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
335         if (lreq == NULL) {
336                 return -1;
337         }
338
339         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
340            so it won't be freed yet */
341         talloc_steal(state, hdr);
342
343         /* now tell the caller than we will retry asynchronously */
344         return -2;
345 }
346
347 /*
348   a varient of ctdb_ltdb_lock_requeue that also fetches the record
349  */
350 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
351                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
352                                  struct ctdb_req_header *hdr, TDB_DATA *data,
353                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
354                                  void *recv_context, bool ignore_generation)
355 {
356         int ret;
357
358         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
359                                      recv_context, ignore_generation);
360         if (ret == 0) {
361                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
362                 if (ret != 0) {
363                         int uret;
364                         uret = ctdb_ltdb_unlock(ctdb_db, key);
365                         if (uret != 0) {
366                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
367                         }
368                 }
369         }
370         return ret;
371 }
372
373
374 /*
375   paraoid check to see if the db is empty
376  */
377 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
378 {
379         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
380         int count = tdb_traverse_read(tdb, NULL, NULL);
381         if (count != 0) {
382                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
383                          ctdb_db->db_path));
384                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
385         }
386 }
387
388 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
389                                 struct ctdb_db_context *ctdb_db)
390 {
391         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
392         char *old;
393         char *reason = NULL;
394         TDB_DATA key;
395         TDB_DATA val;
396
397         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
398         key.dsize = strlen(ctdb_db->db_name);
399
400         old = ctdb_db->unhealthy_reason;
401         ctdb_db->unhealthy_reason = NULL;
402
403         val = tdb_fetch(tdb, key);
404         if (val.dsize > 0) {
405                 reason = talloc_strndup(ctdb_db,
406                                         (const char *)val.dptr,
407                                         val.dsize);
408                 if (reason == NULL) {
409                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
410                                            (int)val.dsize));
411                         ctdb_db->unhealthy_reason = old;
412                         free(val.dptr);
413                         return -1;
414                 }
415         }
416
417         if (val.dptr) {
418                 free(val.dptr);
419         }
420
421         talloc_free(old);
422         ctdb_db->unhealthy_reason = reason;
423         return 0;
424 }
425
426 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
427                                   struct ctdb_db_context *ctdb_db,
428                                   const char *given_reason,/* NULL means healthy */
429                                   int num_healthy_nodes)
430 {
431         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
432         int ret;
433         TDB_DATA key;
434         TDB_DATA val;
435         char *new_reason = NULL;
436         char *old_reason = NULL;
437
438         ret = tdb_transaction_start(tdb);
439         if (ret != 0) {
440                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
441                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
442                 return -1;
443         }
444
445         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
446         if (ret != 0) {
447                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
448                                    ctdb_db->db_name, ret));
449                 return -1;
450         }
451         old_reason = ctdb_db->unhealthy_reason;
452
453         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
454         key.dsize = strlen(ctdb_db->db_name);
455
456         if (given_reason) {
457                 new_reason = talloc_strdup(ctdb_db, given_reason);
458                 if (new_reason == NULL) {
459                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
460                                           given_reason));
461                         return -1;
462                 }
463         } else if (old_reason && num_healthy_nodes == 0) {
464                 /*
465                  * If the reason indicates ok, but there where no healthy nodes
466                  * available, that it means, we have not recovered valid content
467                  * of the db. So if there's an old reason, prefix it with
468                  * "NO-HEALTHY-NODES - "
469                  */
470                 const char *prefix;
471
472 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
473                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
474                 if (ret != 0) {
475                         prefix = _TMP_PREFIX;
476                 } else {
477                         prefix = "";
478                 }
479                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
480                                          prefix, old_reason);
481                 if (new_reason == NULL) {
482                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
483                                           prefix, old_reason));
484                         return -1;
485                 }
486 #undef _TMP_PREFIX
487         }
488
489         if (new_reason) {
490                 val.dptr = discard_const_p(uint8_t, new_reason);
491                 val.dsize = strlen(new_reason);
492
493                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
494                 if (ret != 0) {
495                         tdb_transaction_cancel(tdb);
496                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
497                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
498                                            ret, tdb_errorstr(tdb)));
499                         talloc_free(new_reason);
500                         return -1;
501                 }
502                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
503                                    ctdb_db->db_name, new_reason));
504         } else if (old_reason) {
505                 ret = tdb_delete(tdb, key);
506                 if (ret != 0) {
507                         tdb_transaction_cancel(tdb);
508                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
509                                            tdb_name(tdb), ctdb_db->db_name,
510                                            ret, tdb_errorstr(tdb)));
511                         talloc_free(new_reason);
512                         return -1;
513                 }
514                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
515                                    ctdb_db->db_name));
516         }
517
518         ret = tdb_transaction_commit(tdb);
519         if (ret != TDB_SUCCESS) {
520                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
521                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
522                 talloc_free(new_reason);
523                 return -1;
524         }
525
526         talloc_free(old_reason);
527         ctdb_db->unhealthy_reason = new_reason;
528
529         return 0;
530 }
531
532 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
533                                      struct ctdb_db_context *ctdb_db)
534 {
535         time_t now = time(NULL);
536         char *new_path;
537         char *new_reason;
538         int ret;
539         struct tm *tm;
540
541         tm = gmtime(&now);
542
543         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
544         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
545                                    "%04u%02u%02u%02u%02u%02u.0Z",
546                                    ctdb_db->db_path,
547                                    tm->tm_year+1900, tm->tm_mon+1,
548                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
549                                    tm->tm_sec);
550         if (new_path == NULL) {
551                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
552                 return -1;
553         }
554
555         new_reason = talloc_asprintf(ctdb_db,
556                                      "ERROR - Backup of corrupted TDB in '%s'",
557                                      new_path);
558         if (new_reason == NULL) {
559                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
560                 return -1;
561         }
562         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
563         talloc_free(new_reason);
564         if (ret != 0) {
565                 DEBUG(DEBUG_CRIT,(__location__
566                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
567                                  ctdb_db->db_path));
568                 return -1;
569         }
570
571         ret = rename(ctdb_db->db_path, new_path);
572         if (ret != 0) {
573                 DEBUG(DEBUG_CRIT,(__location__
574                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
575                                   ctdb_db->db_path, new_path,
576                                   errno, strerror(errno)));
577                 talloc_free(new_path);
578                 return -1;
579         }
580
581         DEBUG(DEBUG_CRIT,(__location__
582                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
583                          ctdb_db->db_path, new_path));
584         talloc_free(new_path);
585         return 0;
586 }
587
588 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
589 {
590         struct ctdb_db_context *ctdb_db;
591         int ret;
592         int ok = 0;
593         int fail = 0;
594
595         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
596                 if (!ctdb_db->persistent) {
597                         continue;
598                 }
599
600                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
601                 if (ret != 0) {
602                         DEBUG(DEBUG_ALERT,(__location__
603                                            " load persistent health for '%s' failed\n",
604                                            ctdb_db->db_path));
605                         return -1;
606                 }
607
608                 if (ctdb_db->unhealthy_reason == NULL) {
609                         ok++;
610                         DEBUG(DEBUG_INFO,(__location__
611                                    " persistent db '%s' healthy\n",
612                                    ctdb_db->db_path));
613                         continue;
614                 }
615
616                 fail++;
617                 DEBUG(DEBUG_ALERT,(__location__
618                                    " persistent db '%s' unhealthy: %s\n",
619                                    ctdb_db->db_path,
620                                    ctdb_db->unhealthy_reason));
621         }
622         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
623               ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
624                ok, fail));
625
626         if (fail != 0) {
627                 return -1;
628         }
629
630         return 0;
631 }
632
633
634 /*
635   mark a database - as healthy
636  */
637 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
638 {
639         uint32_t db_id = *(uint32_t *)indata.dptr;
640         struct ctdb_db_context *ctdb_db;
641         int ret;
642         bool may_recover = false;
643
644         ctdb_db = find_ctdb_db(ctdb, db_id);
645         if (!ctdb_db) {
646                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
647                 return -1;
648         }
649
650         if (ctdb_db->unhealthy_reason) {
651                 may_recover = true;
652         }
653
654         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
655         if (ret != 0) {
656                 DEBUG(DEBUG_ERR,(__location__
657                                  " ctdb_update_persistent_health(%s) failed\n",
658                                  ctdb_db->db_name));
659                 return -1;
660         }
661
662         if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
663                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
664                                   ctdb_db->db_name));
665                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
666         }
667
668         return 0;
669 }
670
671 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
672                                    TDB_DATA indata,
673                                    TDB_DATA *outdata)
674 {
675         uint32_t db_id = *(uint32_t *)indata.dptr;
676         struct ctdb_db_context *ctdb_db;
677         int ret;
678
679         ctdb_db = find_ctdb_db(ctdb, db_id);
680         if (!ctdb_db) {
681                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
682                 return -1;
683         }
684
685         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
686         if (ret != 0) {
687                 DEBUG(DEBUG_ERR,(__location__
688                                  " ctdb_load_persistent_health(%s) failed\n",
689                                  ctdb_db->db_name));
690                 return -1;
691         }
692
693         *outdata = tdb_null;
694         if (ctdb_db->unhealthy_reason) {
695                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
696                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
697         }
698
699         return 0;
700 }
701
702
703 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
704 {
705         char *ropath;
706
707         if (ctdb_db->readonly) {
708                 return 0;
709         }
710
711         if (ctdb_db->persistent) {
712                 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
713                 return -1;
714         }
715
716         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
717         if (ropath == NULL) {
718                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
719                 return -1;
720         }
721         ctdb_db->rottdb = tdb_open(ropath, 
722                               ctdb->tunable.database_hash_size, 
723                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
724                               O_CREAT|O_RDWR, 0);
725         if (ctdb_db->rottdb == NULL) {
726                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
727                 talloc_free(ropath);
728                 return -1;
729         }
730
731         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
732
733         ctdb_db->readonly = true;
734
735         DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
736
737         talloc_free(ropath);
738         return 0;
739 }
740
741 /*
742   attach to a database, handling both persistent and non-persistent databases
743   return 0 on success, -1 on failure
744  */
745 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
746                              bool persistent, const char *unhealthy_reason,
747                              bool jenkinshash, bool mutexes)
748 {
749         struct ctdb_db_context *ctdb_db, *tmp_db;
750         int ret;
751         struct TDB_DATA key;
752         unsigned tdb_flags;
753         int mode = 0600;
754         int remaining_tries = 0;
755
756         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
757         CTDB_NO_MEMORY(ctdb, ctdb_db);
758
759         ctdb_db->priority = 1;
760         ctdb_db->ctdb = ctdb;
761         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
762         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
763
764         key.dsize = strlen(db_name)+1;
765         key.dptr  = discard_const(db_name);
766         ctdb_db->db_id = ctdb_hash(&key);
767         ctdb_db->persistent = persistent;
768
769         if (!ctdb_db->persistent) {
770                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
771                 if (ctdb_db->delete_queue == NULL) {
772                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
773                 }
774
775                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
776         }
777
778         /* check for hash collisions */
779         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
780                 if (tmp_db->db_id == ctdb_db->db_id) {
781                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
782                                  tmp_db->db_id, db_name, tmp_db->db_name));
783                         talloc_free(ctdb_db);
784                         return -1;
785                 }
786         }
787
788         if (persistent) {
789                 if (unhealthy_reason) {
790                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
791                                                             unhealthy_reason, 0);
792                         if (ret != 0) {
793                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
794                                                    ctdb_db->db_name, unhealthy_reason, ret));
795                                 talloc_free(ctdb_db);
796                                 return -1;
797                         }
798                 }
799
800                 if (ctdb->max_persistent_check_errors > 0) {
801                         remaining_tries = 1;
802                 }
803                 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
804                         remaining_tries = 0;
805                 }
806
807                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
808                 if (ret != 0) {
809                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
810                                    ctdb_db->db_name, ret));
811                         talloc_free(ctdb_db);
812                         return -1;
813                 }
814         }
815
816         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
817                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
818                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
819                 talloc_free(ctdb_db);
820                 return -1;
821         }
822
823         if (ctdb_db->unhealthy_reason) {
824                 /* this is just a warning, but we want that in the log file! */
825                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
826                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
827         }
828
829         /* open the database */
830         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
831                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
832                                            db_name, ctdb->pnn);
833
834         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
835         if (ctdb->valgrinding) {
836                 tdb_flags |= TDB_NOMMAP;
837         }
838         tdb_flags |= TDB_DISALLOW_NESTING;
839         if (jenkinshash) {
840                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
841         }
842 #ifdef TDB_MUTEX_LOCKING
843         if (ctdb->tunable.mutex_enabled && mutexes &&
844             tdb_runtime_check_for_robust_mutexes()) {
845                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
846         }
847 #endif
848
849 again:
850         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
851                                       ctdb->tunable.database_hash_size, 
852                                       tdb_flags, 
853                                       O_CREAT|O_RDWR, mode);
854         if (ctdb_db->ltdb == NULL) {
855                 struct stat st;
856                 int saved_errno = errno;
857
858                 if (!persistent) {
859                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
860                                           ctdb_db->db_path,
861                                           saved_errno,
862                                           strerror(saved_errno)));
863                         talloc_free(ctdb_db);
864                         return -1;
865                 }
866
867                 if (remaining_tries == 0) {
868                         DEBUG(DEBUG_CRIT,(__location__
869                                           "Failed to open persistent tdb '%s': %d - %s\n",
870                                           ctdb_db->db_path,
871                                           saved_errno,
872                                           strerror(saved_errno)));
873                         talloc_free(ctdb_db);
874                         return -1;
875                 }
876
877                 ret = stat(ctdb_db->db_path, &st);
878                 if (ret != 0) {
879                         DEBUG(DEBUG_CRIT,(__location__
880                                           "Failed to open persistent tdb '%s': %d - %s\n",
881                                           ctdb_db->db_path,
882                                           saved_errno,
883                                           strerror(saved_errno)));
884                         talloc_free(ctdb_db);
885                         return -1;
886                 }
887
888                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
889                 if (ret != 0) {
890                         DEBUG(DEBUG_CRIT,(__location__
891                                           "Failed to open persistent tdb '%s': %d - %s\n",
892                                           ctdb_db->db_path,
893                                           saved_errno,
894                                           strerror(saved_errno)));
895                         talloc_free(ctdb_db);
896                         return -1;
897                 }
898
899                 remaining_tries--;
900                 mode = st.st_mode;
901                 goto again;
902         }
903
904         if (!persistent) {
905                 ctdb_check_db_empty(ctdb_db);
906         } else {
907                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
908                 if (ret != 0) {
909                         int fd;
910                         struct stat st;
911
912                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
913                                           ctdb_db->db_path, ret,
914                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
915                         if (remaining_tries == 0) {
916                                 talloc_free(ctdb_db);
917                                 return -1;
918                         }
919
920                         fd = tdb_fd(ctdb_db->ltdb->tdb);
921                         ret = fstat(fd, &st);
922                         if (ret != 0) {
923                                 DEBUG(DEBUG_CRIT,(__location__
924                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
925                                                   ctdb_db->db_path,
926                                                   errno,
927                                                   strerror(errno)));
928                                 talloc_free(ctdb_db);
929                                 return -1;
930                         }
931
932                         /* close the TDB */
933                         talloc_free(ctdb_db->ltdb);
934                         ctdb_db->ltdb = NULL;
935
936                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
937                         if (ret != 0) {
938                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
939                                                   ctdb_db->db_path));
940                                 talloc_free(ctdb_db);
941                                 return -1;
942                         }
943
944                         remaining_tries--;
945                         mode = st.st_mode;
946                         goto again;
947                 }
948         }
949
950         /* set up a rb tree we can use to track which records we have a 
951            fetch-lock in-flight for so we can defer any additional calls
952            for the same record.
953          */
954         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
955         if (ctdb_db->deferred_fetch == NULL) {
956                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
957                 talloc_free(ctdb_db);
958                 return -1;
959         }
960
961         ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
962         if (ctdb_db->defer_dmaster == NULL) {
963                 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
964                                   ctdb_db->db_name));
965                 talloc_free(ctdb_db);
966                 return -1;
967         }
968
969         DLIST_ADD(ctdb->db_list, ctdb_db);
970
971         /* setting this can help some high churn databases */
972         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
973
974         /* 
975            all databases support the "null" function. we need this in
976            order to do forced migration of records
977         */
978         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
979         if (ret != 0) {
980                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
981                 talloc_free(ctdb_db);
982                 return -1;
983         }
984
985         /* 
986            all databases support the "fetch" function. we need this
987            for efficient Samba3 ctdb fetch
988         */
989         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
990         if (ret != 0) {
991                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
992                 talloc_free(ctdb_db);
993                 return -1;
994         }
995
996         /* 
997            all databases support the "fetch_with_header" function. we need this
998            for efficient readonly record fetches
999         */
1000         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1001         if (ret != 0) {
1002                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1003                 talloc_free(ctdb_db);
1004                 return -1;
1005         }
1006
1007         ret = ctdb_vacuum_init(ctdb_db);
1008         if (ret != 0) {
1009                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1010                                   "database '%s'\n", ctdb_db->db_name));
1011                 talloc_free(ctdb_db);
1012                 return -1;
1013         }
1014
1015         ctdb_db->generation = ctdb->vnn_map->generation;
1016
1017         DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1018                             ctdb_db->db_path, tdb_flags));
1019
1020         /* success */
1021         return 0;
1022 }
1023
1024
1025 struct ctdb_deferred_attach_context {
1026         struct ctdb_deferred_attach_context *next, *prev;
1027         struct ctdb_context *ctdb;
1028         struct ctdb_req_control *c;
1029 };
1030
1031
1032 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1033 {
1034         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1035
1036         return 0;
1037 }
1038
1039 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1040 {
1041         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1042         struct ctdb_context *ctdb = da_ctx->ctdb;
1043
1044         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1045         talloc_free(da_ctx);
1046 }
1047
1048 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1049 {
1050         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1051         struct ctdb_context *ctdb = da_ctx->ctdb;
1052
1053         /* This talloc-steals the packet ->c */
1054         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1055         talloc_free(da_ctx);
1056 }
1057
1058 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1059 {
1060         struct ctdb_deferred_attach_context *da_ctx;
1061
1062         /* call it from the main event loop as soon as the current event 
1063            finishes.
1064          */
1065         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1066                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1067                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1068         }
1069
1070         return 0;
1071 }
1072
1073 /*
1074   a client has asked to attach a new database
1075  */
1076 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1077                                TDB_DATA *outdata, uint64_t tdb_flags, 
1078                                bool persistent, uint32_t client_id,
1079                                struct ctdb_req_control *c,
1080                                bool *async_reply)
1081 {
1082         const char *db_name = (const char *)indata.dptr;
1083         struct ctdb_db_context *db;
1084         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1085         struct ctdb_client *client = NULL;
1086         bool with_jenkinshash, with_mutexes;
1087
1088         if (ctdb->tunable.allow_client_db_attach == 0) {
1089                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1090                                   "AllowClientDBAccess == 0\n", db_name));
1091                 return -1;
1092         }
1093
1094         /* dont allow any local clients to attach while we are in recovery mode
1095          * except for the recovery daemon.
1096          * allow all attach from the network since these are always from remote
1097          * recovery daemons.
1098          */
1099         if (client_id != 0) {
1100                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1101         }
1102         if (client != NULL) {
1103                 /* If the node is inactive it is not part of the cluster
1104                    and we should not allow clients to attach to any
1105                    databases
1106                 */
1107                 if (node->flags & NODE_FLAGS_INACTIVE) {
1108                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1109                         return -1;
1110                 }
1111
1112                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1113                     client->pid != ctdb->recoverd_pid &&
1114                     ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1115                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1116
1117                         if (da_ctx == NULL) {
1118                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1119                                 return -1;
1120                         }
1121
1122                         da_ctx->ctdb = ctdb;
1123                         da_ctx->c = talloc_steal(da_ctx, c);
1124                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1125                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1126
1127                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1128
1129                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1130                         *async_reply = true;
1131                         return 0;
1132                 }
1133         }
1134
1135         /* the client can optionally pass additional tdb flags, but we
1136            only allow a subset of those on the database in ctdb. Note
1137            that tdb_flags is passed in via the (otherwise unused)
1138            srvid to the attach control */
1139 #ifdef TDB_MUTEX_LOCKING
1140         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1141 #else
1142         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1143 #endif
1144
1145         /* see if we already have this name */
1146         db = ctdb_db_handle(ctdb, db_name);
1147         if (db) {
1148                 if (db->persistent != persistent) {
1149                         DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1150                                           "database %s\n", persistent ? "" : "non-",
1151                                           db-> persistent ? "" : "non-", db_name));
1152                         return -1;
1153                 }
1154                 outdata->dptr  = (uint8_t *)&db->db_id;
1155                 outdata->dsize = sizeof(db->db_id);
1156                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1157                 return 0;
1158         }
1159
1160         with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1161 #ifdef TDB_MUTEX_LOCKING
1162         with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1163 #else
1164         with_mutexes = false;
1165 #endif
1166
1167         if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1168                               with_jenkinshash, with_mutexes) != 0) {
1169                 return -1;
1170         }
1171
1172         db = ctdb_db_handle(ctdb, db_name);
1173         if (!db) {
1174                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1175                 return -1;
1176         }
1177
1178         /* remember the flags the client has specified */
1179         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1180
1181         outdata->dptr  = (uint8_t *)&db->db_id;
1182         outdata->dsize = sizeof(db->db_id);
1183
1184         /* Try to ensure it's locked in mem */
1185         lockdown_memory(ctdb->valgrinding);
1186
1187         /* tell all the other nodes about this database */
1188         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1189                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1190                                                 CTDB_CONTROL_DB_ATTACH,
1191                                  0, CTDB_CTRL_FLAG_NOREPLY,
1192                                  indata, NULL, NULL);
1193
1194         /* success */
1195         return 0;
1196 }
1197
1198 /*
1199  * a client has asked to detach from a database
1200  */
1201 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1202                                uint32_t client_id)
1203 {
1204         uint32_t db_id;
1205         struct ctdb_db_context *ctdb_db;
1206         struct ctdb_client *client = NULL;
1207
1208         db_id = *(uint32_t *)indata.dptr;
1209         ctdb_db = find_ctdb_db(ctdb, db_id);
1210         if (ctdb_db == NULL) {
1211                 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1212                                   db_id));
1213                 return -1;
1214         }
1215
1216         if (ctdb->tunable.allow_client_db_attach == 1) {
1217                 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1218                                   "Clients are allowed access to databases "
1219                                   "(AllowClientDBAccess == 1)\n",
1220                                   ctdb_db->db_name));
1221                 return -1;
1222         }
1223
1224         if (ctdb_db->persistent) {
1225                 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1226                                   "denied\n", ctdb_db->db_name));
1227                 return -1;
1228         }
1229
1230         /* Cannot detach from database when in recovery */
1231         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1232                 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1233                 return -1;
1234         }
1235
1236         /* If a control comes from a client, then broadcast it to all nodes.
1237          * Do the actual detach only if the control comes from other daemons.
1238          */
1239         if (client_id != 0) {
1240                 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1241                 if (client != NULL) {
1242                         /* forward the control to all the nodes */
1243                         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1244                                                  CTDB_CONTROL_DB_DETACH, 0,
1245                                                  CTDB_CTRL_FLAG_NOREPLY,
1246                                                  indata, NULL, NULL);
1247                         return 0;
1248                 }
1249                 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1250                                   "for database '%s'\n", ctdb_db->db_name));
1251                 return -1;
1252         }
1253
1254         /* Detach database from recoverd */
1255         if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1256                                      CTDB_SRVID_DETACH_DATABASE,
1257                                      indata) != 0) {
1258                 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1259                 return -1;
1260         }
1261
1262         /* Disable vacuuming and drop all vacuuming data */
1263         talloc_free(ctdb_db->vacuum_handle);
1264         talloc_free(ctdb_db->delete_queue);
1265
1266         /* Terminate any deferred fetch */
1267         talloc_free(ctdb_db->deferred_fetch);
1268
1269         /* Terminate any traverses */
1270         while (ctdb_db->traverse) {
1271                 talloc_free(ctdb_db->traverse);
1272         }
1273
1274         /* Terminate any revokes */
1275         while (ctdb_db->revokechild_active) {
1276                 talloc_free(ctdb_db->revokechild_active);
1277         }
1278
1279         /* Free readonly tracking database */
1280         if (ctdb_db->readonly) {
1281                 talloc_free(ctdb_db->rottdb);
1282         }
1283
1284         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1285
1286         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1287                              ctdb_db->db_name));
1288         talloc_free(ctdb_db);
1289
1290         return 0;
1291 }
1292
1293 /*
1294   attach to all existing persistent databases
1295  */
1296 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1297                                   const char *unhealthy_reason)
1298 {
1299         DIR *d;
1300         struct dirent *de;
1301
1302         /* open the persistent db directory and scan it for files */
1303         d = opendir(ctdb->db_directory_persistent);
1304         if (d == NULL) {
1305                 return 0;
1306         }
1307
1308         while ((de=readdir(d))) {
1309                 char *p, *s, *q;
1310                 size_t len = strlen(de->d_name);
1311                 uint32_t node;
1312                 int invalid_name = 0;
1313                 
1314                 s = talloc_strdup(ctdb, de->d_name);
1315                 if (s == NULL) {
1316                         closedir(d);
1317                         CTDB_NO_MEMORY(ctdb, s);
1318                 }
1319
1320                 /* only accept names ending in .tdb */
1321                 p = strstr(s, ".tdb.");
1322                 if (len < 7 || p == NULL) {
1323                         talloc_free(s);
1324                         continue;
1325                 }
1326
1327                 /* only accept names ending with .tdb. and any number of digits */
1328                 q = p+5;
1329                 while (*q != 0 && invalid_name == 0) {
1330                         if (!isdigit(*q++)) {
1331                                 invalid_name = 1;
1332                         }
1333                 }
1334                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1335                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1336                         talloc_free(s);
1337                         continue;
1338                 }
1339                 p[4] = 0;
1340
1341                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1342                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1343                         closedir(d);
1344                         talloc_free(s);
1345                         return -1;
1346                 }
1347
1348                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1349
1350                 talloc_free(s);
1351         }
1352         closedir(d);
1353         return 0;
1354 }
1355
1356 int ctdb_attach_databases(struct ctdb_context *ctdb)
1357 {
1358         int ret;
1359         char *persistent_health_path = NULL;
1360         char *unhealthy_reason = NULL;
1361         bool first_try = true;
1362
1363         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1364                                                  ctdb->db_directory_state,
1365                                                  PERSISTENT_HEALTH_TDB,
1366                                                  ctdb->pnn);
1367         if (persistent_health_path == NULL) {
1368                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1369                 return -1;
1370         }
1371
1372 again:
1373
1374         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1375                                                    0, TDB_DISALLOW_NESTING,
1376                                                    O_CREAT | O_RDWR, 0600);
1377         if (ctdb->db_persistent_health == NULL) {
1378                 struct tdb_wrap *tdb;
1379
1380                 if (!first_try) {
1381                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1382                                           persistent_health_path,
1383                                           errno,
1384                                           strerror(errno)));
1385                         talloc_free(persistent_health_path);
1386                         talloc_free(unhealthy_reason);
1387                         return -1;
1388                 }
1389                 first_try = false;
1390
1391                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1392                                                    persistent_health_path,
1393                                                    "was cleared after a failure",
1394                                                    "manual verification needed");
1395                 if (unhealthy_reason == NULL) {
1396                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1397                         talloc_free(persistent_health_path);
1398                         return -1;
1399                 }
1400
1401                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1402                                   persistent_health_path));
1403                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1404                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1405                                     O_CREAT | O_RDWR, 0600);
1406                 if (tdb) {
1407                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1408                                           persistent_health_path,
1409                                           errno,
1410                                           strerror(errno)));
1411                         talloc_free(persistent_health_path);
1412                         talloc_free(unhealthy_reason);
1413                         return -1;
1414                 }
1415
1416                 talloc_free(tdb);
1417                 goto again;
1418         }
1419         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1420         if (ret != 0) {
1421                 struct tdb_wrap *tdb;
1422
1423                 talloc_free(ctdb->db_persistent_health);
1424                 ctdb->db_persistent_health = NULL;
1425
1426                 if (!first_try) {
1427                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1428                                           persistent_health_path));
1429                         talloc_free(persistent_health_path);
1430                         talloc_free(unhealthy_reason);
1431                         return -1;
1432                 }
1433                 first_try = false;
1434
1435                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1436                                                    persistent_health_path,
1437                                                    "was cleared after a failure",
1438                                                    "manual verification needed");
1439                 if (unhealthy_reason == NULL) {
1440                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1441                         talloc_free(persistent_health_path);
1442                         return -1;
1443                 }
1444
1445                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1446                                   persistent_health_path));
1447                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1448                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1449                                     O_CREAT | O_RDWR, 0600);
1450                 if (tdb) {
1451                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1452                                           persistent_health_path,
1453                                           errno,
1454                                           strerror(errno)));
1455                         talloc_free(persistent_health_path);
1456                         talloc_free(unhealthy_reason);
1457                         return -1;
1458                 }
1459
1460                 talloc_free(tdb);
1461                 goto again;
1462         }
1463         talloc_free(persistent_health_path);
1464
1465         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1466         talloc_free(unhealthy_reason);
1467         if (ret != 0) {
1468                 return ret;
1469         }
1470
1471         return 0;
1472 }
1473
1474 /*
1475   called when a broadcast seqnum update comes in
1476  */
1477 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1478 {
1479         struct ctdb_db_context *ctdb_db;
1480         if (srcnode == ctdb->pnn) {
1481                 /* don't update ourselves! */
1482                 return 0;
1483         }
1484
1485         ctdb_db = find_ctdb_db(ctdb, db_id);
1486         if (!ctdb_db) {
1487                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1488                 return -1;
1489         }
1490
1491         if (ctdb_db->unhealthy_reason) {
1492                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1493                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1494                 return -1;
1495         }
1496
1497         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1498         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1499         return 0;
1500 }
1501
1502 /*
1503   timer to check for seqnum changes in a ltdb and propogate them
1504  */
1505 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1506                                    struct timeval t, void *p)
1507 {
1508         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1509         struct ctdb_context *ctdb = ctdb_db->ctdb;
1510         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1511         if (new_seqnum != ctdb_db->seqnum) {
1512                 /* something has changed - propogate it */
1513                 TDB_DATA data;
1514                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1515                 data.dsize = sizeof(uint32_t);
1516                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1517                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1518                                          data, NULL, NULL);             
1519         }
1520         ctdb_db->seqnum = new_seqnum;
1521
1522         /* setup a new timer */
1523         ctdb_db->seqnum_update =
1524                 event_add_timed(ctdb->ev, ctdb_db, 
1525                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1526                                 ctdb_ltdb_seqnum_check, ctdb_db);
1527 }
1528
1529 /*
1530   enable seqnum handling on this db
1531  */
1532 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1533 {
1534         struct ctdb_db_context *ctdb_db;
1535         ctdb_db = find_ctdb_db(ctdb, db_id);
1536         if (!ctdb_db) {
1537                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1538                 return -1;
1539         }
1540
1541         if (ctdb_db->seqnum_update == NULL) {
1542                 ctdb_db->seqnum_update =
1543                         event_add_timed(ctdb->ev, ctdb_db, 
1544                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1545                                         ctdb_ltdb_seqnum_check, ctdb_db);
1546         }
1547
1548         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1549         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1550         return 0;
1551 }
1552
1553 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1554                                      uint32_t client_id)
1555 {
1556         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1557         struct ctdb_db_context *ctdb_db;
1558
1559         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1560         if (!ctdb_db) {
1561                 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1562                         DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1563                                          db_prio->db_id));
1564                 }
1565                 return 0;
1566         }
1567
1568         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1569                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1570                 return 0;
1571         }
1572
1573         ctdb_db->priority = db_prio->priority;
1574         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1575
1576         if (client_id != 0) {
1577                 /* Broadcast the update to the rest of the cluster */
1578                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1579                                          CTDB_CONTROL_SET_DB_PRIORITY, 0,
1580                                          CTDB_CTRL_FLAG_NOREPLY, indata,
1581                                          NULL, NULL);
1582         }
1583         return 0;
1584 }
1585
1586
1587 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1588 {
1589         if (ctdb_db->sticky) {
1590                 return 0;
1591         }
1592
1593         if (ctdb_db->persistent) {
1594                 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1595                 return -1;
1596         }
1597
1598         ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1599
1600         ctdb_db->sticky = true;
1601
1602         DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1603
1604         return 0;
1605 }
1606
1607 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1608 {
1609         struct ctdb_db_statistics *s = &ctdb_db->statistics;
1610         int i;
1611
1612         for (i=0; i<MAX_HOT_KEYS; i++) {
1613                 if (s->hot_keys[i].key.dsize > 0) {
1614                         talloc_free(s->hot_keys[i].key.dptr);
1615                 }
1616         }
1617
1618         ZERO_STRUCT(ctdb_db->statistics);
1619 }
1620
1621 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1622                                 uint32_t db_id,
1623                                 TDB_DATA *outdata)
1624 {
1625         struct ctdb_db_context *ctdb_db;
1626         struct ctdb_db_statistics *stats;
1627         int i;
1628         int len;
1629         char *ptr;
1630
1631         ctdb_db = find_ctdb_db(ctdb, db_id);
1632         if (!ctdb_db) {
1633                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1634                 return -1;
1635         }
1636
1637         len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1638         for (i = 0; i < MAX_HOT_KEYS; i++) {
1639                 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1640         }
1641
1642         stats = talloc_size(outdata, len);
1643         if (stats == NULL) {
1644                 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1645                 return -1;
1646         }
1647
1648         memcpy(stats, &ctdb_db->statistics,
1649                offsetof(struct ctdb_db_statistics, hot_keys_wire));
1650
1651         stats->num_hot_keys = MAX_HOT_KEYS;
1652
1653         ptr = &stats->hot_keys_wire[0];
1654         for (i = 0; i < MAX_HOT_KEYS; i++) {
1655                 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1656                        ctdb_db->statistics.hot_keys[i].key.dsize);
1657                 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1658         }
1659
1660         outdata->dptr  = (uint8_t *)stats;
1661         outdata->dsize = len;
1662
1663         return 0;
1664 }