Merge branch 'master' of ssh://git.samba.org/data/git/ctdb
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /**
36  * write a record to a normal database
37  *
38  * This is the server-variant of the ctdb_ltdb_store function.
39  * It contains logic to determine whether a record should be
40  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41  * controls to the local ctdb daemon if apporpriate.
42  */
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
44                                   TDB_DATA key,
45                                   struct ctdb_ltdb_header *header,
46                                   TDB_DATA data)
47 {
48         struct ctdb_context *ctdb = ctdb_db->ctdb;
49         TDB_DATA rec;
50         int ret;
51         bool seqnum_suppressed = false;
52         bool keep = false;
53         bool schedule_for_deletion = false;
54         bool remove_from_delete_queue = false;
55         uint32_t lmaster;
56
57         if (ctdb->flags & CTDB_FLAG_TORTURE) {
58                 struct ctdb_ltdb_header *h2;
59                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
60                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
61                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
62                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
63                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64                 }
65                 if (rec.dptr) free(rec.dptr);
66         }
67
68         if (ctdb->vnn_map == NULL) {
69                 /*
70                  * Called from a client: always store the record
71                  * Also don't call ctdb_lmaster since it uses the vnn_map!
72                  */
73                 keep = true;
74                 goto store;
75         }
76
77         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
78
79         /*
80          * If we migrate an empty record off to another node
81          * and the record has not been migrated with data,
82          * delete the record instead of storing the empty record.
83          */
84         if (data.dsize != 0) {
85                 keep = true;
86         } else if (ctdb_db->persistent) {
87                 keep = true;
88         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
89                 /*
90                  * The record is not created by the client but
91                  * automatically by the ctdb_ltdb_fetch logic that
92                  * creates a record with an initial header in the
93                  * ltdb before trying to migrate the record from
94                  * the current lmaster. Keep it instead of trying
95                  * to delete the non-existing record...
96                  */
97                 keep = true;
98                 schedule_for_deletion = true;
99         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
100                 keep = true;
101         } else if (ctdb_db->ctdb->pnn == lmaster) {
102                 /*
103                  * If we are lmaster, then we usually keep the record.
104                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
105                  * and the record is empty and has never been migrated
106                  * with data, then we should delete it instead of storing it.
107                  * This is part of the vacuuming process.
108                  *
109                  * The reason that we usually need to store even empty records
110                  * on the lmaster is that a client operating directly on the
111                  * lmaster (== dmaster) expects the local copy of the record to
112                  * exist after successful ctdb migrate call. If the record does
113                  * not exist, the client goes into a migrate loop and eventually
114                  * fails. So storing the empty record makes sure that we do not
115                  * need to change the client code.
116                  */
117                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
118                         keep = true;
119                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
120                         keep = true;
121                 }
122         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
123                 keep = true;
124         }
125
126         if (keep) {
127                 if ((data.dsize == 0) &&
128                     !ctdb_db->persistent &&
129                     (ctdb_db->ctdb->pnn == header->dmaster))
130                 {
131                         schedule_for_deletion = true;
132                 }
133                 remove_from_delete_queue = !schedule_for_deletion;
134         }
135
136 store:
137         /*
138          * The VACUUM_MIGRATED flag is only set temporarily for
139          * the above logic when the record was retrieved by a
140          * VACUUM_MIGRATE call and should not be stored in the
141          * database.
142          *
143          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
144          * and there are two cases in which the corresponding record
145          * is stored in the local database:
146          * 1. The record has been migrated with data in the past
147          *    (the MIGRATED_WITH_DATA record flag is set).
148          * 2. The record has been filled with data again since it
149          *    had been submitted in the VACUUM_FETCH message to the
150          *    lmaster.
151          * For such records it is important to not store the
152          * VACUUM_MIGRATED flag in the database.
153          */
154         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
155
156         /*
157          * Similarly, clear the AUTOMATIC flag which should not enter
158          * the local database copy since this would require client
159          * modifications to clear the flag when the client stores
160          * the record.
161          */
162         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
163
164         rec.dsize = sizeof(*header) + data.dsize;
165         rec.dptr = talloc_size(ctdb, rec.dsize);
166         CTDB_NO_MEMORY(ctdb, rec.dptr);
167
168         memcpy(rec.dptr, header, sizeof(*header));
169         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
170
171         /* Databases with seqnum updates enabled only get their seqnum
172            changes when/if we modify the data */
173         if (ctdb_db->seqnum_update != NULL) {
174                 TDB_DATA old;
175                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
176
177                 if ( (old.dsize == rec.dsize)
178                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
179                           rec.dptr+sizeof(struct ctdb_ltdb_header),
180                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
181                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
182                         seqnum_suppressed = true;
183                 }
184                 if (old.dptr) free(old.dptr);
185         }
186
187         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
188                             ctdb_db->db_name,
189                             keep?"storing":"deleting",
190                             ctdb_hash(&key)));
191
192         if (keep) {
193                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
194         } else {
195                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
196         }
197
198         if (ret != 0) {
199                 int lvl = DEBUG_ERR;
200
201                 if (keep == false &&
202                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
203                 {
204                         lvl = DEBUG_DEBUG;
205                 }
206
207                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
208                             "%d - %s\n",
209                             ctdb_db->db_name,
210                             keep?"store":"delete", ret,
211                             tdb_errorstr(ctdb_db->ltdb->tdb)));
212
213                 schedule_for_deletion = false;
214                 remove_from_delete_queue = false;
215         }
216         if (seqnum_suppressed) {
217                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
218         }
219
220         talloc_free(rec.dptr);
221
222         if (schedule_for_deletion) {
223                 int ret2;
224                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
225                 if (ret2 != 0) {
226                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
227                 }
228         }
229
230         if (remove_from_delete_queue) {
231                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
232         }
233
234         return ret;
235 }
236
237 struct lock_fetch_state {
238         struct ctdb_context *ctdb;
239         void (*recv_pkt)(void *, struct ctdb_req_header *);
240         void *recv_context;
241         struct ctdb_req_header *hdr;
242         uint32_t generation;
243         bool ignore_generation;
244 };
245
246 /*
247   called when we should retry the operation
248  */
249 static void lock_fetch_callback(void *p)
250 {
251         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
252         if (!state->ignore_generation &&
253             state->generation != state->ctdb->vnn_map->generation) {
254                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
255                 talloc_free(state->hdr);
256                 return;
257         }
258         state->recv_pkt(state->recv_context, state->hdr);
259         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
260 }
261
262
263 /*
264   do a non-blocking ltdb_lock, deferring this ctdb request until we
265   have the chainlock
266
267   It does the following:
268
269    1) tries to get the chainlock. If it succeeds, then it returns 0
270
271    2) if it fails to get a chainlock immediately then it sets up a
272    non-blocking chainlock via ctdb_lockwait, and when it gets the
273    chainlock it re-submits this ctdb request to the main packet
274    receive function
275
276    This effectively queues all ctdb requests that cannot be
277    immediately satisfied until it can get the lock. This means that
278    the main ctdb daemon will not block waiting for a chainlock held by
279    a client
280
281    There are 3 possible return values:
282
283        0:    means that it got the lock immediately.
284       -1:    means that it failed to get the lock, and won't retry
285       -2:    means that it failed to get the lock immediately, but will retry
286  */
287 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
288                            TDB_DATA key, struct ctdb_req_header *hdr,
289                            void (*recv_pkt)(void *, struct ctdb_req_header *),
290                            void *recv_context, bool ignore_generation)
291 {
292         int ret;
293         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
294         struct lockwait_handle *h;
295         struct lock_fetch_state *state;
296         
297         ret = tdb_chainlock_nonblock(tdb, key);
298
299         if (ret != 0 &&
300             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
301                 /* a hard failure - don't try again */
302                 return -1;
303         }
304
305         /* when torturing, ensure we test the contended path */
306         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
307             random() % 5 == 0) {
308                 ret = -1;
309                 tdb_chainunlock(tdb, key);
310         }
311
312         /* first the non-contended path */
313         if (ret == 0) {
314                 return 0;
315         }
316
317         state = talloc(hdr, struct lock_fetch_state);
318         state->ctdb = ctdb_db->ctdb;
319         state->hdr = hdr;
320         state->recv_pkt = recv_pkt;
321         state->recv_context = recv_context;
322         state->generation = ctdb_db->ctdb->vnn_map->generation;
323         state->ignore_generation = ignore_generation;
324
325         /* now the contended path */
326         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
327         if (h == NULL) {
328                 return -1;
329         }
330
331         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
332            so it won't be freed yet */
333         talloc_steal(state, hdr);
334         talloc_steal(state, h);
335
336         /* now tell the caller than we will retry asynchronously */
337         return -2;
338 }
339
340 /*
341   a varient of ctdb_ltdb_lock_requeue that also fetches the record
342  */
343 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
344                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
345                                  struct ctdb_req_header *hdr, TDB_DATA *data,
346                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
347                                  void *recv_context, bool ignore_generation)
348 {
349         int ret;
350
351         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
352                                      recv_context, ignore_generation);
353         if (ret == 0) {
354                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
355                 if (ret != 0) {
356                         int uret;
357                         uret = ctdb_ltdb_unlock(ctdb_db, key);
358                         if (uret != 0) {
359                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
360                         }
361                 }
362         }
363         return ret;
364 }
365
366
367 /*
368   paraoid check to see if the db is empty
369  */
370 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
371 {
372         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
373         int count = tdb_traverse_read(tdb, NULL, NULL);
374         if (count != 0) {
375                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
376                          ctdb_db->db_path));
377                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
378         }
379 }
380
381 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
382                                 struct ctdb_db_context *ctdb_db)
383 {
384         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
385         char *old;
386         char *reason = NULL;
387         TDB_DATA key;
388         TDB_DATA val;
389
390         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
391         key.dsize = strlen(ctdb_db->db_name);
392
393         old = ctdb_db->unhealthy_reason;
394         ctdb_db->unhealthy_reason = NULL;
395
396         val = tdb_fetch(tdb, key);
397         if (val.dsize > 0) {
398                 reason = talloc_strndup(ctdb_db,
399                                         (const char *)val.dptr,
400                                         val.dsize);
401                 if (reason == NULL) {
402                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
403                                            (int)val.dsize));
404                         ctdb_db->unhealthy_reason = old;
405                         free(val.dptr);
406                         return -1;
407                 }
408         }
409
410         if (val.dptr) {
411                 free(val.dptr);
412         }
413
414         talloc_free(old);
415         ctdb_db->unhealthy_reason = reason;
416         return 0;
417 }
418
419 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
420                                   struct ctdb_db_context *ctdb_db,
421                                   const char *given_reason,/* NULL means healthy */
422                                   int num_healthy_nodes)
423 {
424         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
425         int ret;
426         TDB_DATA key;
427         TDB_DATA val;
428         char *new_reason = NULL;
429         char *old_reason = NULL;
430
431         ret = tdb_transaction_start(tdb);
432         if (ret != 0) {
433                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
434                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
435                 return -1;
436         }
437
438         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
439         if (ret != 0) {
440                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
441                                    ctdb_db->db_name, ret));
442                 return -1;
443         }
444         old_reason = ctdb_db->unhealthy_reason;
445
446         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
447         key.dsize = strlen(ctdb_db->db_name);
448
449         if (given_reason) {
450                 new_reason = talloc_strdup(ctdb_db, given_reason);
451                 if (new_reason == NULL) {
452                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
453                                           given_reason));
454                         return -1;
455                 }
456         } else if (old_reason && num_healthy_nodes == 0) {
457                 /*
458                  * If the reason indicates ok, but there where no healthy nodes
459                  * available, that it means, we have not recovered valid content
460                  * of the db. So if there's an old reason, prefix it with
461                  * "NO-HEALTHY-NODES - "
462                  */
463                 const char *prefix;
464
465 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
466                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
467                 if (ret != 0) {
468                         prefix = _TMP_PREFIX;
469                 } else {
470                         prefix = "";
471                 }
472                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
473                                          prefix, old_reason);
474                 if (new_reason == NULL) {
475                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
476                                           prefix, old_reason));
477                         return -1;
478                 }
479 #undef _TMP_PREFIX
480         }
481
482         if (new_reason) {
483                 val.dptr = discard_const_p(uint8_t, new_reason);
484                 val.dsize = strlen(new_reason);
485
486                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
487                 if (ret != 0) {
488                         tdb_transaction_cancel(tdb);
489                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
490                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
491                                            ret, tdb_errorstr(tdb)));
492                         talloc_free(new_reason);
493                         return -1;
494                 }
495                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
496                                    ctdb_db->db_name, new_reason));
497         } else if (old_reason) {
498                 ret = tdb_delete(tdb, key);
499                 if (ret != 0) {
500                         tdb_transaction_cancel(tdb);
501                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
502                                            tdb_name(tdb), ctdb_db->db_name,
503                                            ret, tdb_errorstr(tdb)));
504                         talloc_free(new_reason);
505                         return -1;
506                 }
507                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
508                                    ctdb_db->db_name));
509         }
510
511         ret = tdb_transaction_commit(tdb);
512         if (ret != TDB_SUCCESS) {
513                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
514                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
515                 talloc_free(new_reason);
516                 return -1;
517         }
518
519         talloc_free(old_reason);
520         ctdb_db->unhealthy_reason = new_reason;
521
522         return 0;
523 }
524
525 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
526                                      struct ctdb_db_context *ctdb_db)
527 {
528         time_t now = time(NULL);
529         char *new_path;
530         char *new_reason;
531         int ret;
532         struct tm *tm;
533
534         tm = gmtime(&now);
535
536         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
537         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
538                                    "%04u%02u%02u%02u%02u%02u.0Z",
539                                    ctdb_db->db_path,
540                                    tm->tm_year+1900, tm->tm_mon+1,
541                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
542                                    tm->tm_sec);
543         if (new_path == NULL) {
544                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
545                 return -1;
546         }
547
548         new_reason = talloc_asprintf(ctdb_db,
549                                      "ERROR - Backup of corrupted TDB in '%s'",
550                                      new_path);
551         if (new_reason == NULL) {
552                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
553                 return -1;
554         }
555         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
556         talloc_free(new_reason);
557         if (ret != 0) {
558                 DEBUG(DEBUG_CRIT,(__location__
559                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
560                                  ctdb_db->db_path));
561                 return -1;
562         }
563
564         ret = rename(ctdb_db->db_path, new_path);
565         if (ret != 0) {
566                 DEBUG(DEBUG_CRIT,(__location__
567                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
568                                   ctdb_db->db_path, new_path,
569                                   errno, strerror(errno)));
570                 talloc_free(new_path);
571                 return -1;
572         }
573
574         DEBUG(DEBUG_CRIT,(__location__
575                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
576                          ctdb_db->db_path, new_path));
577         talloc_free(new_path);
578         return 0;
579 }
580
581 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
582 {
583         struct ctdb_db_context *ctdb_db;
584         int ret;
585         int ok = 0;
586         int fail = 0;
587
588         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
589                 if (!ctdb_db->persistent) {
590                         continue;
591                 }
592
593                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
594                 if (ret != 0) {
595                         DEBUG(DEBUG_ALERT,(__location__
596                                            " load persistent health for '%s' failed\n",
597                                            ctdb_db->db_path));
598                         return -1;
599                 }
600
601                 if (ctdb_db->unhealthy_reason == NULL) {
602                         ok++;
603                         DEBUG(DEBUG_INFO,(__location__
604                                    " persistent db '%s' healthy\n",
605                                    ctdb_db->db_path));
606                         continue;
607                 }
608
609                 fail++;
610                 DEBUG(DEBUG_ALERT,(__location__
611                                    " persistent db '%s' unhealthy: %s\n",
612                                    ctdb_db->db_path,
613                                    ctdb_db->unhealthy_reason));
614         }
615         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
616               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
617                ok, fail));
618
619         if (fail != 0) {
620                 return -1;
621         }
622
623         return 0;
624 }
625
626
627 /*
628   mark a database - as healthy
629  */
630 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
631 {
632         uint32_t db_id = *(uint32_t *)indata.dptr;
633         struct ctdb_db_context *ctdb_db;
634         int ret;
635         bool may_recover = false;
636
637         ctdb_db = find_ctdb_db(ctdb, db_id);
638         if (!ctdb_db) {
639                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
640                 return -1;
641         }
642
643         if (ctdb_db->unhealthy_reason) {
644                 may_recover = true;
645         }
646
647         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
648         if (ret != 0) {
649                 DEBUG(DEBUG_ERR,(__location__
650                                  " ctdb_update_persistent_health(%s) failed\n",
651                                  ctdb_db->db_name));
652                 return -1;
653         }
654
655         if (may_recover && !ctdb->done_startup) {
656                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
657                                   ctdb_db->db_name));
658                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
659         }
660
661         return 0;
662 }
663
664 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
665                                    TDB_DATA indata,
666                                    TDB_DATA *outdata)
667 {
668         uint32_t db_id = *(uint32_t *)indata.dptr;
669         struct ctdb_db_context *ctdb_db;
670         int ret;
671
672         ctdb_db = find_ctdb_db(ctdb, db_id);
673         if (!ctdb_db) {
674                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
675                 return -1;
676         }
677
678         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
679         if (ret != 0) {
680                 DEBUG(DEBUG_ERR,(__location__
681                                  " ctdb_load_persistent_health(%s) failed\n",
682                                  ctdb_db->db_name));
683                 return -1;
684         }
685
686         *outdata = tdb_null;
687         if (ctdb_db->unhealthy_reason) {
688                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
689                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
690         }
691
692         return 0;
693 }
694
695
696 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
697 {
698         char *ropath;
699
700         if (ctdb_db->readonly) {
701                 return 0;
702         }
703
704         if (ctdb_db->persistent) {
705                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
706                 return -1;
707         }
708
709         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
710         if (ropath == NULL) {
711                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
712                 return -1;
713         }
714         ctdb_db->rottdb = tdb_open(ropath, 
715                               ctdb->tunable.database_hash_size, 
716                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
717                               O_CREAT|O_RDWR, 0);
718         if (ctdb_db->rottdb == NULL) {
719                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
720                 talloc_free(ropath);
721                 return -1;
722         }
723
724         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
725
726         ctdb_db->readonly = true;
727         talloc_free(ropath);
728         return 0;
729 }
730
731 /*
732   attach to a database, handling both persistent and non-persistent databases
733   return 0 on success, -1 on failure
734  */
735 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
736                              bool persistent, const char *unhealthy_reason,
737                              bool jenkinshash)
738 {
739         struct ctdb_db_context *ctdb_db, *tmp_db;
740         int ret;
741         struct TDB_DATA key;
742         unsigned tdb_flags;
743         int mode = 0600;
744         int remaining_tries = 0;
745
746         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
747         CTDB_NO_MEMORY(ctdb, ctdb_db);
748
749         ctdb_db->priority = 1;
750         ctdb_db->ctdb = ctdb;
751         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
752         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
753
754         key.dsize = strlen(db_name)+1;
755         key.dptr  = discard_const(db_name);
756         ctdb_db->db_id = ctdb_hash(&key);
757         ctdb_db->persistent = persistent;
758
759         if (!ctdb_db->persistent) {
760                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
761                 if (ctdb_db->delete_queue == NULL) {
762                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
763                 }
764
765                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
766         }
767
768         /* check for hash collisions */
769         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
770                 if (tmp_db->db_id == ctdb_db->db_id) {
771                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
772                                  tmp_db->db_id, db_name, tmp_db->db_name));
773                         talloc_free(ctdb_db);
774                         return -1;
775                 }
776         }
777
778         if (persistent) {
779                 if (unhealthy_reason) {
780                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
781                                                             unhealthy_reason, 0);
782                         if (ret != 0) {
783                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
784                                                    ctdb_db->db_name, unhealthy_reason, ret));
785                                 talloc_free(ctdb_db);
786                                 return -1;
787                         }
788                 }
789
790                 if (ctdb->max_persistent_check_errors > 0) {
791                         remaining_tries = 1;
792                 }
793                 if (ctdb->done_startup) {
794                         remaining_tries = 0;
795                 }
796
797                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
798                 if (ret != 0) {
799                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
800                                    ctdb_db->db_name, ret));
801                         talloc_free(ctdb_db);
802                         return -1;
803                 }
804         }
805
806         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
807                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
808                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
809                 talloc_free(ctdb_db);
810                 return -1;
811         }
812
813         if (ctdb_db->unhealthy_reason) {
814                 /* this is just a warning, but we want that in the log file! */
815                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
816                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
817         }
818
819         /* open the database */
820         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
821                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
822                                            db_name, ctdb->pnn);
823
824         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
825         if (ctdb->valgrinding) {
826                 tdb_flags |= TDB_NOMMAP;
827         }
828         tdb_flags |= TDB_DISALLOW_NESTING;
829         if (jenkinshash) {
830                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
831         }
832
833 again:
834         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
835                                       ctdb->tunable.database_hash_size, 
836                                       tdb_flags, 
837                                       O_CREAT|O_RDWR, mode);
838         if (ctdb_db->ltdb == NULL) {
839                 struct stat st;
840                 int saved_errno = errno;
841
842                 if (!persistent) {
843                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
844                                           ctdb_db->db_path,
845                                           saved_errno,
846                                           strerror(saved_errno)));
847                         talloc_free(ctdb_db);
848                         return -1;
849                 }
850
851                 if (remaining_tries == 0) {
852                         DEBUG(DEBUG_CRIT,(__location__
853                                           "Failed to open persistent tdb '%s': %d - %s\n",
854                                           ctdb_db->db_path,
855                                           saved_errno,
856                                           strerror(saved_errno)));
857                         talloc_free(ctdb_db);
858                         return -1;
859                 }
860
861                 ret = stat(ctdb_db->db_path, &st);
862                 if (ret != 0) {
863                         DEBUG(DEBUG_CRIT,(__location__
864                                           "Failed to open persistent tdb '%s': %d - %s\n",
865                                           ctdb_db->db_path,
866                                           saved_errno,
867                                           strerror(saved_errno)));
868                         talloc_free(ctdb_db);
869                         return -1;
870                 }
871
872                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
873                 if (ret != 0) {
874                         DEBUG(DEBUG_CRIT,(__location__
875                                           "Failed to open persistent tdb '%s': %d - %s\n",
876                                           ctdb_db->db_path,
877                                           saved_errno,
878                                           strerror(saved_errno)));
879                         talloc_free(ctdb_db);
880                         return -1;
881                 }
882
883                 remaining_tries--;
884                 mode = st.st_mode;
885                 goto again;
886         }
887
888         if (!persistent) {
889                 ctdb_check_db_empty(ctdb_db);
890         } else {
891                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
892                 if (ret != 0) {
893                         int fd;
894                         struct stat st;
895
896                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
897                                           ctdb_db->db_path, ret,
898                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
899                         if (remaining_tries == 0) {
900                                 talloc_free(ctdb_db);
901                                 return -1;
902                         }
903
904                         fd = tdb_fd(ctdb_db->ltdb->tdb);
905                         ret = fstat(fd, &st);
906                         if (ret != 0) {
907                                 DEBUG(DEBUG_CRIT,(__location__
908                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
909                                                   ctdb_db->db_path,
910                                                   errno,
911                                                   strerror(errno)));
912                                 talloc_free(ctdb_db);
913                                 return -1;
914                         }
915
916                         /* close the TDB */
917                         talloc_free(ctdb_db->ltdb);
918                         ctdb_db->ltdb = NULL;
919
920                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
921                         if (ret != 0) {
922                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
923                                                   ctdb_db->db_path));
924                                 talloc_free(ctdb_db);
925                                 return -1;
926                         }
927
928                         remaining_tries--;
929                         mode = st.st_mode;
930                         goto again;
931                 }
932         }
933
934         /* set up a rb tree we can use to track which records we have a 
935            fetch-lock in-flight for so we can defer any additional calls
936            for the same record.
937          */
938         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
939         if (ctdb_db->deferred_fetch == NULL) {
940                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
941                 talloc_free(ctdb_db);
942                 return -1;
943         }
944
945         DLIST_ADD(ctdb->db_list, ctdb_db);
946
947         /* setting this can help some high churn databases */
948         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
949
950         /* 
951            all databases support the "null" function. we need this in
952            order to do forced migration of records
953         */
954         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
955         if (ret != 0) {
956                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
957                 talloc_free(ctdb_db);
958                 return -1;
959         }
960
961         /* 
962            all databases support the "fetch" function. we need this
963            for efficient Samba3 ctdb fetch
964         */
965         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
966         if (ret != 0) {
967                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
968                 talloc_free(ctdb_db);
969                 return -1;
970         }
971
972         /* 
973            all databases support the "fetch_with_header" function. we need this
974            for efficient readonly record fetches
975         */
976         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
977         if (ret != 0) {
978                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
979                 talloc_free(ctdb_db);
980                 return -1;
981         }
982
983         ret = ctdb_vacuum_init(ctdb_db);
984         if (ret != 0) {
985                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
986                                   "database '%s'\n", ctdb_db->db_name));
987                 talloc_free(ctdb_db);
988                 return -1;
989         }
990
991
992         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
993         
994         /* success */
995         return 0;
996 }
997
998
999 struct ctdb_deferred_attach_context {
1000         struct ctdb_deferred_attach_context *next, *prev;
1001         struct ctdb_context *ctdb;
1002         struct ctdb_req_control *c;
1003 };
1004
1005
1006 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1007 {
1008         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1009
1010         return 0;
1011 }
1012
1013 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1014 {
1015         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1016         struct ctdb_context *ctdb = da_ctx->ctdb;
1017
1018         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1019         talloc_free(da_ctx);
1020 }
1021
1022 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1023 {
1024         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1025         struct ctdb_context *ctdb = da_ctx->ctdb;
1026
1027         /* This talloc-steals the packet ->c */
1028         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1029         talloc_free(da_ctx);
1030 }
1031
1032 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1033 {
1034         struct ctdb_deferred_attach_context *da_ctx;
1035
1036         /* call it from the main event loop as soon as the current event 
1037            finishes.
1038          */
1039         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1040                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1041                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1042         }
1043
1044         return 0;
1045 }
1046
1047 /*
1048   a client has asked to attach a new database
1049  */
1050 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1051                                TDB_DATA *outdata, uint64_t tdb_flags, 
1052                                bool persistent, uint32_t client_id,
1053                                struct ctdb_req_control *c,
1054                                bool *async_reply)
1055 {
1056         const char *db_name = (const char *)indata.dptr;
1057         struct ctdb_db_context *db;
1058         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1059         struct ctdb_client *client = NULL;
1060
1061         if (ctdb->tunable.allow_client_db_attach == 0) {
1062                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1063                                   "AllowClientDBAccess == 0\n", db_name));
1064                 return -1;
1065         }
1066
1067         /* dont allow any local clients to attach while we are in recovery mode
1068          * except for the recovery daemon.
1069          * allow all attach from the network since these are always from remote
1070          * recovery daemons.
1071          */
1072         if (client_id != 0) {
1073                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1074         }
1075         if (client != NULL) {
1076                 /* If the node is inactive it is not part of the cluster
1077                    and we should not allow clients to attach to any
1078                    databases
1079                 */
1080                 if (node->flags & NODE_FLAGS_INACTIVE) {
1081                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1082                         return -1;
1083                 }
1084
1085                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1086                  && client->pid != ctdb->recoverd_pid
1087                  && !ctdb->done_startup) {
1088                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1089
1090                         if (da_ctx == NULL) {
1091                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1092                                 return -1;
1093                         }
1094
1095                         da_ctx->ctdb = ctdb;
1096                         da_ctx->c = talloc_steal(da_ctx, c);
1097                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1098                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1099
1100                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1101
1102                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1103                         *async_reply = true;
1104                         return 0;
1105                 }
1106         }
1107
1108         /* the client can optionally pass additional tdb flags, but we
1109            only allow a subset of those on the database in ctdb. Note
1110            that tdb_flags is passed in via the (otherwise unused)
1111            srvid to the attach control */
1112         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1113
1114         /* see if we already have this name */
1115         db = ctdb_db_handle(ctdb, db_name);
1116         if (db) {
1117                 outdata->dptr  = (uint8_t *)&db->db_id;
1118                 outdata->dsize = sizeof(db->db_id);
1119                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1120                 return 0;
1121         }
1122
1123         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1124                 return -1;
1125         }
1126
1127         db = ctdb_db_handle(ctdb, db_name);
1128         if (!db) {
1129                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1130                 return -1;
1131         }
1132
1133         /* remember the flags the client has specified */
1134         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1135
1136         outdata->dptr  = (uint8_t *)&db->db_id;
1137         outdata->dsize = sizeof(db->db_id);
1138
1139         /* Try to ensure it's locked in mem */
1140         ctdb_lockdown_memory(ctdb);
1141
1142         /* tell all the other nodes about this database */
1143         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1144                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1145                                                 CTDB_CONTROL_DB_ATTACH,
1146                                  0, CTDB_CTRL_FLAG_NOREPLY,
1147                                  indata, NULL, NULL);
1148
1149         /* success */
1150         return 0;
1151 }
1152
1153
1154 /*
1155   attach to all existing persistent databases
1156  */
1157 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1158                                   const char *unhealthy_reason)
1159 {
1160         DIR *d;
1161         struct dirent *de;
1162
1163         /* open the persistent db directory and scan it for files */
1164         d = opendir(ctdb->db_directory_persistent);
1165         if (d == NULL) {
1166                 return 0;
1167         }
1168
1169         while ((de=readdir(d))) {
1170                 char *p, *s, *q;
1171                 size_t len = strlen(de->d_name);
1172                 uint32_t node;
1173                 int invalid_name = 0;
1174                 
1175                 s = talloc_strdup(ctdb, de->d_name);
1176                 CTDB_NO_MEMORY(ctdb, s);
1177
1178                 /* only accept names ending in .tdb */
1179                 p = strstr(s, ".tdb.");
1180                 if (len < 7 || p == NULL) {
1181                         talloc_free(s);
1182                         continue;
1183                 }
1184
1185                 /* only accept names ending with .tdb. and any number of digits */
1186                 q = p+5;
1187                 while (*q != 0 && invalid_name == 0) {
1188                         if (!isdigit(*q++)) {
1189                                 invalid_name = 1;
1190                         }
1191                 }
1192                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1193                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1194                         talloc_free(s);
1195                         continue;
1196                 }
1197                 p[4] = 0;
1198
1199                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1200                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1201                         closedir(d);
1202                         talloc_free(s);
1203                         return -1;
1204                 }
1205
1206                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1207
1208                 talloc_free(s);
1209         }
1210         closedir(d);
1211         return 0;
1212 }
1213
1214 int ctdb_attach_databases(struct ctdb_context *ctdb)
1215 {
1216         int ret;
1217         char *persistent_health_path = NULL;
1218         char *unhealthy_reason = NULL;
1219         bool first_try = true;
1220
1221         if (ctdb->db_directory == NULL) {
1222                 ctdb->db_directory = VARDIR "/ctdb";
1223         }
1224         if (ctdb->db_directory_persistent == NULL) {
1225                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1226         }
1227         if (ctdb->db_directory_state == NULL) {
1228                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1229         }
1230
1231         /* make sure the db directory exists */
1232         ret = mkdir(ctdb->db_directory, 0700);
1233         if (ret == -1 && errno != EEXIST) {
1234                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1235                          ctdb->db_directory));
1236                 return -1;
1237         }
1238
1239         /* make sure the persistent db directory exists */
1240         ret = mkdir(ctdb->db_directory_persistent, 0700);
1241         if (ret == -1 && errno != EEXIST) {
1242                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1243                          ctdb->db_directory_persistent));
1244                 return -1;
1245         }
1246
1247         /* make sure the internal state db directory exists */
1248         ret = mkdir(ctdb->db_directory_state, 0700);
1249         if (ret == -1 && errno != EEXIST) {
1250                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1251                          ctdb->db_directory_state));
1252                 return -1;
1253         }
1254
1255         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1256                                                  ctdb->db_directory_state,
1257                                                  PERSISTENT_HEALTH_TDB,
1258                                                  ctdb->pnn);
1259         if (persistent_health_path == NULL) {
1260                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1261                 return -1;
1262         }
1263
1264 again:
1265
1266         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1267                                                    0, TDB_DISALLOW_NESTING,
1268                                                    O_CREAT | O_RDWR, 0600);
1269         if (ctdb->db_persistent_health == NULL) {
1270                 struct tdb_wrap *tdb;
1271
1272                 if (!first_try) {
1273                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1274                                           persistent_health_path,
1275                                           errno,
1276                                           strerror(errno)));
1277                         talloc_free(persistent_health_path);
1278                         talloc_free(unhealthy_reason);
1279                         return -1;
1280                 }
1281                 first_try = false;
1282
1283                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1284                                                    persistent_health_path,
1285                                                    "was cleared after a failure",
1286                                                    "manual verification needed");
1287                 if (unhealthy_reason == NULL) {
1288                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1289                         talloc_free(persistent_health_path);
1290                         return -1;
1291                 }
1292
1293                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1294                                   persistent_health_path));
1295                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1296                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1297                                     O_CREAT | O_RDWR, 0600);
1298                 if (tdb) {
1299                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1300                                           persistent_health_path,
1301                                           errno,
1302                                           strerror(errno)));
1303                         talloc_free(persistent_health_path);
1304                         talloc_free(unhealthy_reason);
1305                         return -1;
1306                 }
1307
1308                 talloc_free(tdb);
1309                 goto again;
1310         }
1311         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1312         if (ret != 0) {
1313                 struct tdb_wrap *tdb;
1314
1315                 talloc_free(ctdb->db_persistent_health);
1316                 ctdb->db_persistent_health = NULL;
1317
1318                 if (!first_try) {
1319                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1320                                           persistent_health_path));
1321                         talloc_free(persistent_health_path);
1322                         talloc_free(unhealthy_reason);
1323                         return -1;
1324                 }
1325                 first_try = false;
1326
1327                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1328                                                    persistent_health_path,
1329                                                    "was cleared after a failure",
1330                                                    "manual verification needed");
1331                 if (unhealthy_reason == NULL) {
1332                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1333                         talloc_free(persistent_health_path);
1334                         return -1;
1335                 }
1336
1337                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1338                                   persistent_health_path));
1339                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1340                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1341                                     O_CREAT | O_RDWR, 0600);
1342                 if (tdb) {
1343                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1344                                           persistent_health_path,
1345                                           errno,
1346                                           strerror(errno)));
1347                         talloc_free(persistent_health_path);
1348                         talloc_free(unhealthy_reason);
1349                         return -1;
1350                 }
1351
1352                 talloc_free(tdb);
1353                 goto again;
1354         }
1355         talloc_free(persistent_health_path);
1356
1357         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1358         talloc_free(unhealthy_reason);
1359         if (ret != 0) {
1360                 return ret;
1361         }
1362
1363         return 0;
1364 }
1365
1366 /*
1367   called when a broadcast seqnum update comes in
1368  */
1369 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1370 {
1371         struct ctdb_db_context *ctdb_db;
1372         if (srcnode == ctdb->pnn) {
1373                 /* don't update ourselves! */
1374                 return 0;
1375         }
1376
1377         ctdb_db = find_ctdb_db(ctdb, db_id);
1378         if (!ctdb_db) {
1379                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1380                 return -1;
1381         }
1382
1383         if (ctdb_db->unhealthy_reason) {
1384                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1385                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1386                 return -1;
1387         }
1388
1389         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1390         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1391         return 0;
1392 }
1393
1394 /*
1395   timer to check for seqnum changes in a ltdb and propogate them
1396  */
1397 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1398                                    struct timeval t, void *p)
1399 {
1400         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1401         struct ctdb_context *ctdb = ctdb_db->ctdb;
1402         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1403         if (new_seqnum != ctdb_db->seqnum) {
1404                 /* something has changed - propogate it */
1405                 TDB_DATA data;
1406                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1407                 data.dsize = sizeof(uint32_t);
1408                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1409                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1410                                          data, NULL, NULL);             
1411         }
1412         ctdb_db->seqnum = new_seqnum;
1413
1414         /* setup a new timer */
1415         ctdb_db->seqnum_update =
1416                 event_add_timed(ctdb->ev, ctdb_db, 
1417                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1418                                 ctdb_ltdb_seqnum_check, ctdb_db);
1419 }
1420
1421 /*
1422   enable seqnum handling on this db
1423  */
1424 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1425 {
1426         struct ctdb_db_context *ctdb_db;
1427         ctdb_db = find_ctdb_db(ctdb, db_id);
1428         if (!ctdb_db) {
1429                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1430                 return -1;
1431         }
1432
1433         if (ctdb_db->seqnum_update == NULL) {
1434                 ctdb_db->seqnum_update =
1435                         event_add_timed(ctdb->ev, ctdb_db, 
1436                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1437                                         ctdb_ltdb_seqnum_check, ctdb_db);
1438         }
1439
1440         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1441         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1442         return 0;
1443 }
1444
1445 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1446 {
1447         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1448         struct ctdb_db_context *ctdb_db;
1449
1450         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1451         if (!ctdb_db) {
1452                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1453                 return 0;
1454         }
1455
1456         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1457                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1458                 return 0;
1459         }
1460
1461         ctdb_db->priority = db_prio->priority;
1462         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1463
1464         return 0;
1465 }
1466